Eli
01/09/2022, 10:49 PMlaunch
or async
on the calling function. I'm trying to implement this as close to the example as I can, but no matter what I do, it doesn't seem to be executing the items in the flow concurrently, but only sequentially. Here's a simplified code snippet of what I'm attempting:
enum class Ordinal(val delay: Long) {
FIRST(100),
SECOND(200),
THIRD(300),
FOURTH(400),
FIFTH(500),
SIXTH(600),
SEVENTH(700)
}
private fun ordinalFlow() = flow {
for (ordinal in Ordinal.values()) {
delay(ordinal.delay)
emit(ordinal)
}
}
@Test
fun asyncBufferFlowTest() {
val blockTime = measureTimeMillis {
runBlocking {
val timer = measureTimeMillis {
ordinalFlow()
.buffer()
.collect { println("Collected: $it") }
}
println("Done in $timer")
}
}
println("Block time in $blockTime ms")
}
Eli
01/09/2022, 10:52 PMCollected: FIRST
Collected: SECOND
Collected: THIRD
Collected: FOURTH
Collected: FIFTH
Collected: SIXTH
Collected: SEVENTH
Done in 2898
Block time in 2949 ms
If that was operating in parallel I would have expected that to complete on the order of 700-800 ms, not 2800-2900 msephemient
01/09/2022, 10:53 PMJoffrey
01/09/2022, 10:53 PMbuffer()
only helps if downstream operations to process the elements take time. The downstream processing of each element can be concurrent with the source flowEli
01/09/2022, 10:54 PMephemient
01/09/2022, 10:55 PMOrdinal.values().asFlow()
.flatMapMerge { ordinal ->
flowOf(ordinal).onStart { delay(ordinal.delay) }
}
will be concurrent up to whatever the default concurrency isEli
01/09/2022, 10:56 PMephemient
01/09/2022, 10:57 PMchannelFlow {
for (ordinal in Ordinal.values()) {
launch {
delay(ordinal.delay)
send(ordinal)
}
}
}
which is a bit more obvious, but has no limitation on how many jobs it launches at onceEli
01/09/2022, 10:58 PMephemient
01/09/2022, 10:58 PMEli
01/09/2022, 10:58 PMephemient
01/09/2022, 10:59 PMephemient
01/09/2022, 11:00 PMEli
01/09/2022, 11:01 PMephemient
01/09/2022, 11:01 PMEli
01/09/2022, 11:06 PMflatMapMerge
say
Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. Most likely, suspending operation in map operator will be sufficient and linear transformations are much easier to reason about.So that implies there is another way of accomplishing this. I see what the flatMapMerge is doing, but it still seems like a strange way of accomplishing this goal. Is this an unreasonable expectation to want to do?
Eli
01/09/2022, 11:08 PMephemient
01/09/2022, 11:10 PMEli
01/09/2022, 11:11 PMephemient
01/09/2022, 11:13 PMEli
01/09/2022, 11:28 PMephemient
01/09/2022, 11:35 PMflatMapMerge
one take longer than launch
because it doesn't launch more than that many jobs at once (by default)ephemient
01/09/2022, 11:35 PMEli
01/09/2022, 11:36 PM