Skip to content

Change streams with Kotlin coroutines #4987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
huberchrigu opened this issue May 29, 2025 · 0 comments
Open

Change streams with Kotlin coroutines #4987

huberchrigu opened this issue May 29, 2025 · 0 comments
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@huberchrigu
Copy link

Currently there seems to be no way to consume mongo event streams in a Kotlin coroutines setup. See the following test:

@DataMongoTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
@Import(TestcontainersConfiguration::class)
class MongoAggregateChangesTest(private val mongoTemplate: ReactiveMongoTemplate) {
    @Test
    fun testFlux() { // success
        runTest { onHit ->
            mongoTemplate.changeStream<TestDocument>().listen()
                .subscribe { onHit.apply(it.body!!) }
        }
    }

    @Test
    fun testLaunch() { // fails
        runTest { onHit ->
            launch {
                mongoTemplate.changeStream<TestDocument>().listen()
                    .subscribe { onHit.apply(it.body!!) }
            }
        }
    }

    @Test
    fun testCoroutine() { // fails
        runTest { onHit ->
            launch {
                mongoTemplate.changeStream<TestDocument>().listen().asFlow()
                    .collect { onHit.apply(it.body!!) }
            }
        }
    }

    private fun runTest(listener: suspend CoroutineScope.(EventHandler) -> Unit) {
        runBlocking {
            val check = mutableSetOf<Int>()
            listener(DefaultEventHandler(check))
            Flux.range(0, 100).flatMap { mongoTemplate.save(TestDocument(it)) }.subscribe { println("Saved: ${it.id}") }

            await().atMost(Duration.ofSeconds(3)).until { check.size == 100 }
        }
    }

    class TestDocument(val id: Int)

    class DefaultEventHandler(private val check: MutableSet<Int>) : EventHandler {
        override fun apply(document: TestDocument) {
            val id = document.id
            println("Received: $id")
            check.add(id)
        }
    }

    fun interface EventHandler {
        fun apply(document: TestDocument)
    }
}

In my project, I cannot use event streams as flux, as the events need to be mapped by suspend functions. Is there any way to use event streams with coroutines? asFlow() seems to cause issues not only in this test, but also in projects where the flow is consumed by controller methods.

I think this would require a dedicated CoroutineMongoTemplate that makes use of the Kotlin MongoClient.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label May 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

2 participants