Dmytro Serdiuk
01/09/2024, 11:24 PMOperator fusion
Applications of flowOn, buffer, and produceIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.
I saw this explanation, but didn’t get it.
Thank you!Sam
01/10/2024, 8:05 AMflatMapMerge
works by launching a coroutine and sending items from each of the input flows to a single channel where they are combined. When you use buffer
, this would normally create a new channel with the appropriate capacity for your buffer. Operator fusion means that instead of creating a new channel, buffer
will replace the existing channel that was already created by flatMapMerge
, so that both steps use a single channel. Does that help to answer your question? It's hard to give an example since these are internal implementation details that mostly just affect performance.Dmytro Serdiuk
01/10/2024, 10:08 AMimport kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*
fun flowFrom(elem: String): Flow<String> {
println("Called flowFrom")
return flowOf(1, 2, 3).map { "${it}_${elem} " }.onEach { println("Numbers ${Thread.currentThread()} $it") }.onEach { delay(50) }
}
fun main() = runBlocking<Unit> {
println("Main ${Thread.currentThread()}")
flowOf("A", "B", "C").onEach { println("Strings ${Thread.currentThread()} $it") }.onEach { delay(50) }
.flatMapMerge() { flowFrom(it) }.buffer(1)
.collect { delay(500); println("Result ${Thread.currentThread()} $it") }
}
Dmytro Serdiuk
01/10/2024, 10:09 AMDmytro Serdiuk
01/10/2024, 5:09 PMSam
01/11/2024, 9:23 AMflatMapMerge
function creates coroutines which will be responsible for selecting an item to put in the buffer. These will be activated each time the buffer becomes empty. The first time that happens will be when the flow starts collection, at which point it waits 100ms (50 + 50) and collects 1_A
. The item is immediately collected, so the buffer is empty again. There are no items eligible for collection, so we wait another 50ms. Now the B
producer has started, so both 2_A
and 1_B
are eligible for collection. The buffer can choose which of these items to select, so they can appear in either order. Now the buffer is full, and it won't become empty until the collector is done with its 500ms wait. After this, we've spent enough time waiting that we have also expired all of the remaining 50ms delays in the producer flows. From now on, the buffer can select arbitrarily from either the A
, B
or C
producers until all three are empty.Dmytro Serdiuk
01/11/2024, 11:13 AM