Skip to content

Publisher hangs on receive #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rpreissel opened this issue Jan 3, 2018 · 1 comment
Closed

Publisher hangs on receive #197

rpreissel opened this issue Jan 3, 2018 · 1 comment
Assignees
Labels

Comments

@rpreissel
Copy link

I have an issue with publisher in the following code snippet (The snippet is stripped down to the essentials and does not implement an useful task).
I'm using Kotlin 1.2.10 and coroutines 0.21 with Java 8

fun withPublisher(context: CoroutineContext): Publisher<Unit> = publish(context) {
    val sourceA = publish(context) { while (isActive) send(Unit) }
    val sourceB = publish(context) { while (isActive) send(Unit) }
    sourceA.openSubscription().use { channelA ->
        sourceB.openSubscription().use { channelB ->
            while (isActive) {
                val index: Int = select {
                    channelA.onReceive { 0 }
                    channelB.onReceive { 1 }
                }
                when (index) {
                    0 -> {
                        channelB.receive()
                        send(Unit)
                    }

                    1 -> {
                        channelA.receive()
                        send(Unit)
                    }
                }
            }
        }
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    withPublisher(newSingleThreadContext("flow"))
            .consumeEach { println("consume on thread ${Thread.currentThread().name}") }
}

The code hangs after the first consuming.
It works If I use Unconfined as CoroutineContext
I also tried the same with produce and Channels and it worked as well.
You find my experiments here.

I can see that in the second run the sourceB is suspended at the send()-method and the channelB.receive() is suspended as well.

@elizarov elizarov added the bug label Jan 13, 2018
@elizarov elizarov self-assigned this Jan 13, 2018
@elizarov
Copy link
Contributor

Confirm. This looks like a bug to me. Thanks for report.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants