I’m playing around with flows (code in thread) and...
# flow
d
I’m playing around with flows (code in thread) and I’m trying to understand their behavior in this simple program. 1. Why running that code doesn’t print anything but “launch publishers” and “launched subscriber”? 2. Why uncommenting the
delay
fixes (kind of) the issue, but the result doesn’t start from “0”? 3. Why does changing the dispatcher of
pubScope
to
Default
makes the behavior similar to the one mentioned in 2?
Copy code
fun main() {

    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 50, onBufferOverflow = BufferOverflow.SUSPEND)

    val times = 100
    val consumerJob = Job()
    val consumeScope = CoroutineScope(Dispatchers.Default) + consumerJob
    val pubJob = Job()
    val pubScope = CoroutineScope(Dispatchers.Unconfined) + pubJob

    consumeScope.launch(Dispatchers.Default) {
        println("launched subscriber")
        sharedFlow
            .takeWhile { it < times }
            .collect {
                delay(100)
                println("Collect: $it")
            }
    }
    println("launch publishers")
    repeat(times + 1) {
        pubScope.launch {
//            delay(10)
//            println("launched: $it")
            sharedFlow.emit(it)
        }
    }

    runBlocking {
        consumerJob.join()
        pubJob.join()
    }
}
b
consumer scope uses Default dispatcher, while producer scope is Unconfined. That means all items were emitted before consumer subscribed to them. You configured your flow without a reply, so these events are lost. One of the fixes is to change start mode for consume job, e.g.
Copy code
consumeScope.launch(Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
d
I see, thank you. One more question if it’s ok - why doesn’t the flow get canceled even though I have
takeWhile
? The program gets stuck at the end without exiting. It seems like the
join()
calls never resume
it’s not only the
consumerJob
but the
publisher
one as well
b
Because you never cancel them, what you wanted to do is to use a Job produced by launch function:
Copy code
val pubJob = pubScope.launch(..) { ... }
val consumerJob = consumerScope.launch(..) { .. }
what you have now is two unbounded Jobs, these jobs won't be cancelled/completed once their [current] childs are completed
launch function creates a new Job instance (unless you pass a Job instance as a context to launch as an argument), and attaches it to the Job from scope.
d
right, you just filled some important gaps. Thank you very much