bloder
06/12/2019, 9:43 PMclass ConflatedTests: CoroutineScope by CoroutineScope(Dispatchers.Unconfined) {
private val channel = ConflatedBroadcastChannel<Int>()
@ExperimentalCoroutinesApi
fun main() = launch {
launch {
channel.consumeEach {
println(it)
}
}
dispatch(channel, 1)
dispatch(channel, 2)
dispatch(channel, 3)
dispatch(channel, 4)
}
@ExperimentalCoroutinesApi
private fun <T> CoroutineScope.dispatch(channel: ConflatedBroadcastChannel<T>, action: T) = launch {
channel.send(action)
}
}
the problem here is that is just consuming the first and last emission printing 1
and 4
, probably the other emissions are being suspended for any reason... Could anyone help me with that? What am I missing here?bdawg.io
06/12/2019, 10:40 PMmain
function launches a coroutine and then continues to return because there's no other work outside of that coroutine.
You now have this root coroutine
launched by your main
that executes after main has returned (Unconfined uses any available thread, in this case, likely the single thread you're running the test on)
That root coroutine
then queues up 5 more coroutines in this order:
1. launch (channel consumeEach), job queued
2. launch (channel send 1), job queued
3. launch (channel send 2), job queued
4. launch (channel send 3), job queued
5. launch (channel send 4), job queued
Now that your root coroutine
has reached the end of its execution, it goes into waiting mode for the children jobs to complete, which frees up your thread. It now starts going through the queue.
- channel consumeEach
starts up and suspends in priority because there is no item available in the channel, yet.
- dispatch 1
starts up and sends item 1 onto the channel.
- channel consumeEach
starts up again because it has priority and the channel now has an item. It prints, and then suspends in without priority. How this mechanism and priority/racing exactly works is implementation detail that I personally do not understand, yet (hopefully)
Your queue now looks something like this:
3. launch (channel send 2), job queued
4. launch (channel send 3), job queued
5. launch (channel send 4), job queued
6. launch (channel consumeEach), job suspended
As you can see with your queue, 2 and 3 are next in queue, but will get squashed by your conflated channel. Which leaves 4
in the channel to then be consumed by the consumeEach
coroutine.Zach Klippenstein (he/him) [MOD]
06/12/2019, 11:32 PMconsumeEach
is queued before any of the sends
, like you said, so it runs first. Since the channel is empty, it suspends and adds its continuation to the channel’s queue of receivers (this is how “sending item 1 onto the channel” works).
Then the first send
occurs, which sees there’s a receiver queued up, so dispatches the receiver immediately. The Unconfined
dispatcher adds the receiver’s continuation to the end of it’s event queue, like you’ve got in the second list.
The second send
will see there are no queued up receivers, so will just cache the value and continue immediately (no suspends or resumes).
Once all the `send`s are complete, the queued up receive gets executed (task 6 in your list), which prints “1", and then the for
loop in consumeEach
continues, checks the channel again, pulls the latest conflated value (4) out without suspending and prints it.bloder
06/13/2019, 12:27 AMZach Klippenstein (he/him) [MOD]
06/13/2019, 12:31 AMbloder
06/13/2019, 12:43 AM@ExperimentalCoroutinesApi
suspend fun main() {
launch {
channel.consumeEach {
println(it)
}
}
dispatch(channel, 1)
dispatch(channel, 2)
dispatch(channel, 3)
dispatch(channel, 4)
}
bdawg.io
06/13/2019, 12:45 AMsuspend
. It never actually suspends since you're launching jobs.bdawg.io
06/13/2019, 12:46 AMbloder
06/13/2019, 12:47 AMZach Klippenstein (he/him) [MOD]
06/13/2019, 12:57 AMUnconfined
does trampolining. When the coroutine wrapper is there, every time you call launch
, the dispatcher sees that it's already executing a task (the root coroutine) and enqueues the new task instead of running it directly.
Without the wrapper, each launch
from your main function gets executed immediately, before launch
returns, because the Unconfined
dispatcher isn't currently executing anything and so will just run the task immediately.bloder
06/13/2019, 1:01 AM