I'm having some problems on understanding channels...
# coroutines
b
I'm having some problems on understanding channels with structured concurrency I have this case where I want to print all broadcast emissions:
Copy code
class ConflatedTests: CoroutineScope by CoroutineScope(Dispatchers.Unconfined) {

    private val channel = ConflatedBroadcastChannel<Int>()

    @ExperimentalCoroutinesApi
    fun main() = launch {
        launch {
            channel.consumeEach {
                println(it)
            }
        }
        dispatch(channel, 1)
        dispatch(channel, 2)
        dispatch(channel, 3)
        dispatch(channel, 4)
    }

    @ExperimentalCoroutinesApi
    private fun <T> CoroutineScope.dispatch(channel: ConflatedBroadcastChannel<T>, action: T) = launch {
        channel.send(action)
    }
}
the problem here is that is just consuming the first and last emission printing
1
and
4
, probably the other emissions are being suspended for any reason... Could anyone help me with that? What am I missing here?
b
This is primarily understanding the eagerness of the unconfined scope and a queue of coroutines waiting for their turn (from an outsider's observation on how it behave) Let's look at the coroutines you're launching. Your
main
function launches a coroutine and then continues to return because there's no other work outside of that coroutine. You now have this
root coroutine
launched by your
main
that executes after main has returned (Unconfined uses any available thread, in this case, likely the single thread you're running the test on) That
root coroutine
then queues up 5 more coroutines in this order:
Copy code
1. launch (channel consumeEach), job queued
2. launch (channel send 1), job queued
3. launch (channel send 2), job queued
4. launch (channel send 3), job queued
5. launch (channel send 4), job queued
Now that your
root coroutine
has reached the end of its execution, it goes into waiting mode for the children jobs to complete, which frees up your thread. It now starts going through the queue. -
channel consumeEach
starts up and suspends in priority because there is no item available in the channel, yet. -
dispatch 1
starts up and sends item 1 onto the channel. -
channel consumeEach
starts up again because it has priority and the channel now has an item. It prints, and then suspends in without priority. How this mechanism and priority/racing exactly works is implementation detail that I personally do not understand, yet (hopefully) Your queue now looks something like this:
Copy code
3. launch (channel send 2), job queued
4. launch (channel send 3), job queued
5. launch (channel send 4), job queued
6. launch (channel consumeEach), job suspended
As you can see with your queue, 2 and 3 are next in queue, but will get squashed by your conflated channel. Which leaves
4
in the channel to then be consumed by the
consumeEach
coroutine.
z
There’s no magic priority, just the order of events, like you described. The first iteration of
consumeEach
is queued before any of the
sends
, like you said, so it runs first. Since the channel is empty, it suspends and adds its continuation to the channel’s queue of receivers (this is how “sending item 1 onto the channel” works). Then the first
send
occurs, which sees there’s a receiver queued up, so dispatches the receiver immediately. The
Unconfined
dispatcher adds the receiver’s continuation to the end of it’s event queue, like you’ve got in the second list. The second
send
will see there are no queued up receivers, so will just cache the value and continue immediately (no suspends or resumes). Once all the `send`s are complete, the queued up receive gets executed (task 6 in your list), which prints “1", and then the
for
loop in
consumeEach
continues, checks the channel again, pulls the latest conflated value (4) out without suspending and prints it.
b
@bdawg.io @Zach Klippenstein (he/him) [MOD] thank you for your time and great answers, it helped a lot, I've fixed that removing my root coroutine declaration and turns it in a suspend function... I'm calling this function from a runBlocking and it's working, do you know why doing that solved my problem?
z
not entirely sure what you mean – can you post the new code?
b
sure, the main function is not wrapping those jobs with a launch anymore:
Copy code
@ExperimentalCoroutinesApi
    suspend fun main() {
        launch {
            channel.consumeEach {
                println(it)
            }
        }
        dispatch(channel, 1)
        dispatch(channel, 2)
        dispatch(channel, 3)
        dispatch(channel, 4)
    }
b
You shouldn't need to have your main function to be
suspend
. It never actually suspends since you're launching jobs.
👍 1
@Zach Klippenstein (he/him) [MOD] That makes sense. That's definitely why I did my disclaimer on not understanding how the "priority" is ocurring
b
Yeah you're right, I actually dont need it as a suspend function, tested now and works without it
z
I think it works when you get rid of the enclosing coroutine because
Unconfined
does trampolining. When the coroutine wrapper is there, every time you call
launch
, the dispatcher sees that it's already executing a task (the root coroutine) and enqueues the new task instead of running it directly. Without the wrapper, each
launch
from your main function gets executed immediately, before
launch
returns, because the
Unconfined
dispatcher isn't currently executing anything and so will just run the task immediately.
👍 1
b
Nice, thank you guys for your help!