Hey there. Not really an issue but I'm trying to u...
# coroutines
d
Hey there. Not really an issue but I'm trying to understand the behavior of the following flow.
Copy code
fun getContent(dropN: Long) = flow {
        val counter = AtomicLong(dropN)
        val resource = javaClass.getResource("/products.csv").toURI()

        File(resource).useLines { lines ->
            lines.filter { it.isNotEmpty() }
                .drop(dropN.toInt())
                .map { it.toLong() }
                .chunked(100)
                .onEach { counter.addAndGet(it.size.toLong()) }
                .map { getContentByProductIds(it) }
                .onEach { println("Processed ${counter.get()} lines from input file") }
                .forEach { emit(it) }
        }
    }
The flow is collected by multiple co-routines and I see exactly the same lines printed multiple times.
Copy code
Processed 100 lines from input file
Processed 100 lines from input file
Given the flow definition I would assume the second onEach should only be invoked when the first onEach was invoked before, which should always increment the counter. Why do I still see the same counter value multiple times?
t
Flow are cold: everytime a coroutine collects a flow, its block is executed again from the start. If you want multiple coroutines to share the same counter, consider using SharedFlow (via
shareIn
operator)
☝️ 1
d
counter
is being reset for each flow#
a
A flow is like a function. If you have a function that prints something, it will print that thing once for each time you call it because it runs the whole function once for each time it is called. Collecting a flow runs the whole flow each time it is collected.
1
Each collect on a flow is isolated from every other collect, and the flow only executes when it is collected. This is what people mean when they say flows are, "cold" (by default, until you start adding other fancy things like SharedFlow or StateFlow)
d
Okay now I guess it made click. What would be the correct approach if I want to implement single-producer-multiple-consumers (fan-out) with a producer that does not produce more than the consumers can handle?
thanks for the explanation already
the individual collectors should not process the same emitted value, but should distribute the emitted values.
d
produceIn(...).receiveAsFlow()
. Recommend looking up docs for these functions.
d
will do, thanks