https://kotlinlang.org logo
#coroutines
Title
# coroutines
g

Gilles Barbier

02/06/2021, 10:39 PM
Hi, I do not understand the following behavior:
Copy code
fun main() {
    val flow = MutableSharedFlow<Int>(replay = 0)

    runBlocking {
        launch {
            println( flow.first { it >= 90 } )
        }

        launch(<http://Dispatchers.IO|Dispatchers.IO>) {
            repeat(1000) {
                flow.emit(Random.nextInt(0, 100))
            }
        }
    }
}
This code does not print anything. It prints when I remove
<http://Dispatchers.IO|Dispatchers.IO>
at the second launch or (sometimes) if I add
<http://Dispatchers.IO|Dispatchers.IO>
at the first launch... any explanation?
z

Zach Klippenstein (he/him) [MOD]

02/06/2021, 11:58 PM
I think this is what’s happening: The first launch doesn’t start immediately – it schedules the coroutine to start on the event loop created by
runBlocking
. The second launch schedules its coroutine to start on the
IO
dispatcher, which is backed by a thread pool. It’s possible for one of the threads in that pool to pick up the new coroutine, and finish executing it (all 1000 iterations of your loop) before the first launched coroutine has a chance to start collecting from your flow.
Likely the
println("sending")
takes enough time (it’s actual IO) that it gives your original thread a chance to process the event loop and start the collecting coroutine.
Another way to make this deterministic is to tell your first launch to run immediately until its first suspension point:
launch(start = CoroutineStart.UNDISPATCHED) {
g

Gilles Barbier

02/07/2021, 12:56 AM
very interesting - you are right, it's consistent with the observation that it's sensitive to replay value. I'm going to check
start = CoroutineStart.UNDISPATCHED
that I did not know. Thanks a lot
2 Views