I have a question about `Flow` with and without a ...
# coroutines
s
I have a question about
Flow
with and without a buffer, in this example:
Copy code
val flow = flow {
    for (i in 1..100) {
        emit(1)
        delay(200)
    }
}.flowOn(Dispatchers.Main) // try replacing it with Dispatchers.IO... 

MainScope().launch {
    flow.collect { 
        println(it)
        delay(1000)
    }
}
This prints out an item each 1200 milliseconds or so. This makes sense. When using
buffer()
, I can speed up the collecting a bit and print out an item each 1000 milliseconds or so. This also make sense to me. However, just replacing
.flowOn(Dispatchers.Main)
with
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the print-outs now happen each 1000 milliseconds as well…. I expected the frequency to still be 1200 milliseconds. Is this to be expected, that even without a buffer (with a size > 0), the
flow
now acts as if it has a buffer with using different dispatchers like this?
z
I think
flowOn
is effectively a no-op when you pass the same dispatcher that you’re collecting on. If that’s true, this flow would be entirely synchronous which is why you’re seeing 1200ms. If you use a different dispatcher, then the
flow { … }
body is being executed in a different coroutine from the collection, so the delays will happen concurrently. The same thing happens when you add a buffer – the original
flow
body is executed in a different coroutine.
s
I see. The core of your answer is “the delays happen concurrently” Still, it is a different in behavior, solely based on the dispatchers used. When using the same dispatcher, why can’t the delays happen simultaneous too? That would keep the behavior identical. It seems a short-cut is take when the dispatchers are the same the the
emit
winds up as a ‘simple’ call to the collectors lambda.
z
because when the dispatchers are the same,
emit
is a synchronous call to your
collect
body, so the
delay(200)
can’t happen until it returns (after
delay(1000)
)
if you always want your
flow
body to run concurrently, just explicitly launch a new coroutine inside it (use
channelFlow
)
Copy code
val flow = channelFlow {
    launch {
      for (i in 1..100) {
          send(1)
          delay(200)
      }
    }
}
you could also add a buffer by using the
buffer()
operator, but i think this more directly expresses the behavior you want
s
I know all about using channels or a buffer to ‘fix’ this. And I understand how it may happen. But why is it implemented this way? Why are the behaviors not the same, regardless of the dispatchers used? It is possible to make the behaviors the same. If there’s no buffer, have
emit
suspend until the collector’s lambda finishes.
z
Oh, you want the 1200 delay even if buffers/different dispatchers are used?
s
I expected the frequency of the print-outs to be 1200 milliseconds all the time, regardless of the Dispatchers used if there is no buffer (or Channel with a buffer) The Flow’s
emit
suspends and resumes only when the collector’s lambda finishes. If buffers are used, the delay should be 1000. But the difference in dispatchers shouldn’t matter, and the delays should be 1200…
z
The whole point of the
flowOn
operator is to allow you to run the upstream flow concurrently with the downstream flow though. Like if you replace your
delay
with
Thread.sleep
(or some long-running IO operation), you don’t want that blocking the collecting coroutine.
What is your use case for the behavior you’re expecting?
s
Not a real use case 🙂 Just curiosity From Roman’s post about this, (when using Flow and Flow with a buffer), i understood that without a buffer, the
emit
suspends until the collector’s lambda/action returns/finishes. https://medium.com/@elizarov/kotlin-flows-and-coroutines-256260fb3bdb With a buffer, the
emit
suspends only if the buffer is full. I didn’t expect the difference in dispatchers to make a difference in the suspending behavior of the
emit
. Much like a regular
suspend fun
, the function returns when the final result is returned, no matter with what context that function was called and no matter with what context the function is implemented. Calling the a
suspend fun
is like doing an
emit
, the implementation of that
suspend fun
is like doing a collect-action (but for a Flow, you can do it multiple times instead of just once 🙂).
However, if the dispatchers are the same, there may be a short-cut in the implementation of the FlowCollector to just directly call the collect-action when
emit
is called. Then I understand why it behaves that way… but should it? 🙂
z
I think it should. Synchronous execution is the default for Flow, but almost more as an optimization than anything, and only for the simplest case when there is no concurrency. Coroutine code likes being asynchronous, so as soon as anything in the flow is asynchronous, it uses multiple coroutines to do stuff. In general, more concurrency is better - I think that's one of the main premises behind the coroutines feature. If you have logic that depends on downstream flows being executed synchronously, you probably need explicit synchronization, or maybe a different pattern entirely. Hard to discuss without an actual use case. But in general, if you accept the premise that "more concurrency is better", this behavior makes sense.
s
My confusion is about the
FlowCollector
, the behavior defined for its
emit
function. I understood it to be defined that the
emit
suspends until the collector’s action returns. This is based on this quote from Roman from his article on Medium:
… because both emitter and collector are parts of a sequential execution here and it alternates between them.
What dispatchers are used should not matter, in my opinion. When introducing a buffer, the intermediate buffer’s collector(’s action) now just puts an item onto its queue and it returns almost immediately. But this still would mean that the original
emit
suspends until the buffer’s collector returns (quickly now, without any additional delay; the collector just put something on a queue without delay). The total delay between print-outs goes down to 1000 (unless the buffer gets full).
z
I understood it to be defined that the
emit
suspends until the collector’s action returns.
This is true, but “the collector” refers to the immediate/direct collector of the operator, not the final collector of the entire chain.
s
That could be the case, but I'm not sure (yet). I may put an issue/question in the bug tracker about this and see what the JetBrain folks have to say 😃
This operators retains a sequential nature of flow if changing the context does not call for changing the dispatcher. Otherwise, if changing dispatcher is required, it collects flow emissions in one coroutine that is run using a specified context and emits them from another coroutines with the original collector’s context using a channel with a default buffer size between two coroutines similarly to buffer operator, unless buffer operator is explicitly called before or after flowOn, which requests buffering behavior and specifies channel size.
(This is done so the sake of efficiency)
s
Thank you, @elizarov! If I understand it all correctly: Calling
flowOn
basically adds a channel with a buffer of default size. The frequency goes down to 1000 msec. The fact that using
Dispatchers.Main
when calling
flowOn
, in my example, still has a 1200 msec frequency is due to an optimization (short-path when dispatcher is the same).
e
Yes
👍 1