Kinda similar question as above, but maybe a littl...
# coroutines
e
Kinda similar question as above, but maybe a little simpler. I'm reading the documentation for Flows, and I'd like to implement a Flow that executes each of the items concurrently. And when I'm reading the docs, I see https://kotlinlang.org/docs/flow.html#buffering which explains quite well that this should be possible without explicitly calling
launch
or
async
on the calling function. I'm trying to implement this as close to the example as I can, but no matter what I do, it doesn't seem to be executing the items in the flow concurrently, but only sequentially. Here's a simplified code snippet of what I'm attempting:
Copy code
enum class Ordinal(val delay: Long) {
		FIRST(100),
		SECOND(200),
		THIRD(300),
		FOURTH(400),
		FIFTH(500),
		SIXTH(600),
		SEVENTH(700)
	}

	private fun ordinalFlow() = flow {
		for (ordinal in Ordinal.values()) {
			delay(ordinal.delay)
			emit(ordinal)
		}
	}

	@Test
	fun asyncBufferFlowTest() {
		val blockTime = measureTimeMillis {
			runBlocking {
				val timer = measureTimeMillis {
					ordinalFlow()
						.buffer()
						.collect { println("Collected: $it") }
				}
				println("Done in $timer")
			}
		}
		println("Block time in $blockTime ms")
	}
Any help on this would be greatly appreciated. I realize I could just fall back to a regular sequence or collection using launch... but I'd really like to understand why this isn't working the way I think it should. here are the results of the above code:
Copy code
Collected: FIRST
Collected: SECOND
Collected: THIRD
Collected: FOURTH
Collected: FIFTH
Collected: SIXTH
Collected: SEVENTH
Done in 2898
Block time in 2949 ms
If that was operating in parallel I would have expected that to complete on the order of 700-800 ms, not 2800-2900 ms
e
a flow is an ordered sequence. that documentation on buffering is talking about decoupling the producer of a flow from the consumer of a flow, but in your case here, the producer is sequential
j
Flows are sequential by default. When you define your flow with delays between each item, this delay will happen.
buffer()
only helps if downstream operations to process the elements take time. The downstream processing of each element can be concurrent with the source flow
e
I was just going based on the example given in the docs. What you are saying makes sense, but it still begs the question is there a way to make them concurrent?
e
Copy code
Ordinal.values().asFlow()
    .flatMapMerge { ordinal ->
        flowOf(ordinal).onStart { delay(ordinal.delay) }
    }
will be concurrent up to whatever the default concurrency is
e
Hmm... ok I'll give that a try, thanks
e
you could also consider
Copy code
channelFlow {
    for (ordinal in Ordinal.values()) {
        launch {
            delay(ordinal.delay)
            send(ordinal)
        }
    }
}
which is a bit more obvious, but has no limitation on how many jobs it launches at once
e
What is that flat map merge... I don't see it as an option on a collection
e
The launch option works... Ive tried that and it makes sense to me, I'm just trying to understand the docs for this example
e
in the case of the documentation, the producer is faster than the consumer
buffer allows the producer to "run ahead" even though the consumer is "behind"
e
Hmm, so putting the delay effectively is slowing down the producer
e
just for the sake of the example, yes
e
Interesting, the docs for
flatMapMerge
say
Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. Most likely, suspending operation in map operator will be sufficient and linear transformations are much easier to reason about.
So that implies there is another way of accomplishing this. I see what the flatMapMerge is doing, but it still seems like a strange way of accomplishing this goal. Is this an unreasonable expectation to want to do?
I have also tried separating out the delay using the map as the docs describe, and that was similiarly unsuccessful
e
it is not what Flow is typically used for - it is inherently a sequential stream, with potentially asynchronous producer and consumer
1
e
I guess my issue is that it seems like the asynchronous producer isn't executed asynchronously
e
in coroutines, code that is written sequentially, runs as if it were sequential. your producer is sequential. your consumer is sequential. what Flow provides is allowing the two to be concurrent with respect to each other
e
Tje flatMapMerge technique worked... though that doesn't seem any simpler than just using launch. Interesting. Thank you all for your explanations and suggestions. I'll see if I can study this further to wrap my brain around it
e
if you had a list with more than 16 elements, you'd see the
flatMapMerge
one take longer than
launch
because it doesn't launch more than that many jobs at once (by default)
whether that's desirable or not depends on what you're doing
e
My specific use-case has a relatively small number of elements, but it's good to undrstand how it's expected to scale