Hello guys! Could somebody explain me how buffer ...
# coroutines
d
Hello guys! Could somebody explain me how buffer applied after flattenMapMerge is fused and affects any internal buffers? If possible, with examples.
Copy code
Operator fusion

Applications of flowOn, buffer, and produceIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.
I saw this explanation, but didn’t get it. Thank you!
s
Internally,
flatMapMerge
works by launching a coroutine and sending items from each of the input flows to a single channel where they are combined. When you use
buffer
, this would normally create a new channel with the appropriate capacity for your buffer. Operator fusion means that instead of creating a new channel,
buffer
will replace the existing channel that was already created by
flatMapMerge
, so that both steps use a single channel. Does that help to answer your question? It's hard to give an example since these are internal implementation details that mostly just affect performance.
d
@Sam, Thank you for your reply! I still don’t understand it, as this code produce unpredictable results:
Copy code
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

fun flowFrom(elem: String): Flow<String> {
    println("Called flowFrom")
    return flowOf(1, 2, 3).map { "${it}_${elem} " }.onEach { println("Numbers ${Thread.currentThread()} $it") }.onEach { delay(50) }
}

fun main() = runBlocking<Unit> {
    println("Main ${Thread.currentThread()}")
    flowOf("A", "B", "C").onEach { println("Strings ${Thread.currentThread()} $it") }.onEach { delay(50) }
        .flatMapMerge() { flowFrom(it) }.buffer(1)
        .collect { delay(500); println("Result ${Thread.currentThread()} $it") }    
}
so buffer(1) have no effect at all
@Sam, sorry to bother you, may I ask you for help to understand it please. Thank you
s
Perhaps it would help if you try to picture the state of the application at each point in time? Let's assume a buffer of size 1. The
flatMapMerge
function creates coroutines which will be responsible for selecting an item to put in the buffer. These will be activated each time the buffer becomes empty. The first time that happens will be when the flow starts collection, at which point it waits 100ms (50 + 50) and collects
1_A
. The item is immediately collected, so the buffer is empty again. There are no items eligible for collection, so we wait another 50ms. Now the
B
producer has started, so both
2_A
and
1_B
are eligible for collection. The buffer can choose which of these items to select, so they can appear in either order. Now the buffer is full, and it won't become empty until the collector is done with its 500ms wait. After this, we've spent enough time waiting that we have also expired all of the remaining 50ms delays in the producer flows. From now on, the buffer can select arbitrarily from either the
A
,
B
or
C
producers until all three are empty.
d
Thank you very much for the explanation