streetsofboston
06/21/2019, 5:36 PMFlow
with and without a buffer, in this example:
val flow = flow {
for (i in 1..100) {
emit(1)
delay(200)
}
}.flowOn(Dispatchers.Main) // try replacing it with Dispatchers.IO...
MainScope().launch {
flow.collect {
println(it)
delay(1000)
}
}
This prints out an item each 1200 milliseconds or so. This makes sense.
When using buffer()
, I can speed up the collecting a bit and print out an item each 1000 milliseconds or so. This also make sense to me.
However, just replacing .flowOn(Dispatchers.Main)
with .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the print-outs now happen each 1000 milliseconds as well…. I expected the frequency to still be 1200 milliseconds.
Is this to be expected, that even without a buffer (with a size > 0), the flow
now acts as if it has a buffer with using different dispatchers like this?Zach Klippenstein (he/him) [MOD]
06/21/2019, 7:38 PMflowOn
is effectively a no-op when you pass the same dispatcher that you’re collecting on. If that’s true, this flow would be entirely synchronous which is why you’re seeing 1200ms.
If you use a different dispatcher, then the flow { … }
body is being executed in a different coroutine from the collection, so the delays will happen concurrently. The same thing happens when you add a buffer – the original flow
body is executed in a different coroutine.streetsofboston
06/21/2019, 7:44 PMemit
winds up as a ‘simple’ call to the collectors lambda.Zach Klippenstein (he/him) [MOD]
06/21/2019, 7:47 PMemit
is a synchronous call to your collect
body, so the delay(200)
can’t happen until it returns (after delay(1000)
)flow
body to run concurrently, just explicitly launch a new coroutine inside it (use channelFlow
)val flow = channelFlow {
launch {
for (i in 1..100) {
send(1)
delay(200)
}
}
}
buffer()
operator, but i think this more directly expresses the behavior you wantstreetsofboston
06/21/2019, 7:52 PMemit
suspend until the collector’s lambda finishes.Zach Klippenstein (he/him) [MOD]
06/21/2019, 7:54 PMstreetsofboston
06/21/2019, 7:55 PMemit
suspends and resumes only when the collector’s lambda finishes.
If buffers are used, the delay should be 1000.
But the difference in dispatchers shouldn’t matter, and the delays should be 1200…Zach Klippenstein (he/him) [MOD]
06/21/2019, 7:58 PMflowOn
operator is to allow you to run the upstream flow concurrently with the downstream flow though. Like if you replace your delay
with Thread.sleep
(or some long-running IO operation), you don’t want that blocking the collecting coroutine.streetsofboston
06/21/2019, 8:06 PMemit
suspends until the collector’s lambda/action returns/finishes.
https://medium.com/@elizarov/kotlin-flows-and-coroutines-256260fb3bdb
With a buffer, the emit
suspends only if the buffer is full.
I didn’t expect the difference in dispatchers to make a difference in the suspending behavior of the emit
.
Much like a regular suspend fun
, the function returns when the final result is returned, no matter with what context that function was called and no matter with what context the function is implemented. Calling the a suspend fun
is like doing an emit
, the implementation of that suspend fun
is like doing a collect-action (but for a Flow, you can do it multiple times instead of just once 🙂).emit
is called. Then I understand why it behaves that way… but should it? 🙂Zach Klippenstein (he/him) [MOD]
06/21/2019, 8:32 PMstreetsofboston
06/21/2019, 8:47 PMFlowCollector
, the behavior defined for its emit
function.
I understood it to be defined that the emit
suspends until the collector’s action returns. This is based on this quote from Roman from his article on Medium:
… because both emitter and collector are parts of a sequential execution here and it alternates between them.What dispatchers are used should not matter, in my opinion. When introducing a buffer, the intermediate buffer’s collector(’s action) now just puts an item onto its queue and it returns almost immediately. But this still would mean that the original
emit
suspends until the buffer’s collector returns (quickly now, without any additional delay; the collector just put something on a queue without delay). The total delay between print-outs goes down to 1000 (unless the buffer gets full).Zach Klippenstein (he/him) [MOD]
06/21/2019, 10:50 PMI understood it to be defined that theThis is true, but “the collector” refers to the immediate/direct collector of the operator, not the final collector of the entire chain.suspends until the collector’s action returns.emit
streetsofboston
06/22/2019, 12:56 AMelizarov
06/22/2019, 5:21 AMThis operators retains a sequential nature of flow if changing the context does not call for changing the dispatcher. Otherwise, if changing dispatcher is required, it collects flow emissions in one coroutine that is run using a specified context and emits them from another coroutines with the original collector’s context using a channel with a default buffer size between two coroutines similarly to buffer operator, unless buffer operator is explicitly called before or after flowOn, which requests buffering behavior and specifies channel size.
streetsofboston
06/22/2019, 1:48 PMflowOn
basically adds a channel with a buffer of default size. The frequency goes down to 1000 msec.
The fact that using Dispatchers.Main
when calling flowOn
, in my example, still has a 1200 msec frequency is due to an optimization (short-path when dispatcher is the same).elizarov
06/22/2019, 5:33 PM