How can I somehow suspend until a flow starts to b...
# coroutines
j
How can I somehow suspend until a flow starts to be collected to make sure that a collector will receive the first values that will be emitted in the flow ? Example of code to fix (https://pl.kotl.in/-sNdWODnA):
Copy code
package com.example.script

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    val count = MutableStateFlow(0)
    
    runBlocking {
        // Collect the values in a new coroutine.
        val job = launch {
            // Prints "3 4 5".
            count.take(3).collect {
                println("$it ")
            }
        }
        
        // TODO: Somehow synchronize and suspend here until the coroutine starts collecting
        // so that we print "0 1 2" instead.
        
        repeat(3) {
            count.value++
        }
        
        repeat(3) {
            delay(500)
            count.value++
        }
    }
}
This example is bad because I'm using a StateFlow which is conflated but my main question remains "how do we wait for a collector to start collecting?"
s
Assignment of the mutable-state-flow’s
value
is non-suspending, it races through it. It works in the second loop, since you do a
delay()
there. Does it work if you add a
yield()
call just after
count.value++
in the first loop?
m
Isn't this what a normal
Flow
is for? A normal (non-buffered)
Flow
suspends on
emit
until a collector consumes it.
a
Launching the collector with
CoroutineStart.UNDISPATCHED
will generally do the trick if you're not afraid of experimental API
But that relies on collection only suspending once the full upstream collection has started
So it's far from perfect
j
It sounds like maybe
MutableStateFlow
isn't the best fit. You're having to work too hard to get around its built-in conflation.
e
Just use a
Channel
.
👍 1