doesn't fuse with an existing buffer. Is there an important reason for it, or is it just an oversight that could be fixed? I'll put an example in the thread.
Sam
10/02/2024, 11:05 AM
Here's the code:
Copy code
val start = TimeSource.Monotonic.markNow()
val numbers = flowOf(1, 2, 3, 4, 5)
.onEach { println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s") }
.buffer(0) // existing buffer
val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }
ticker.zip(numbers) { _, n -> n }
.collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }
Sam
10/02/2024, 11:05 AM
We can see from the docs that
zip
collects its
other
flow in a new coroutine, equivalent to
buffer(0)
Sam
10/02/2024, 11:06 AM
If the buffers were fused, I'd expect something like this:
Copy code
Emit 1 at t=0s
Collect 1 at t=1s
Emit 2 at t=1s
Collect 2 at t=2s
Emit 3 at t=2s
Collect 3 at t=3s
Emit 4 at t=3s
Collect 4 at t=4s
Emit 5 at t=4s
Collect 5 at t=5s
Where each value is prepared one step ahead of time
Sam
10/02/2024, 11:07 AM
But instead, the output looks more like this:
Copy code
Emit 1 at t=0s
Emit 2 at t=0s
Collect 1 at t=1s
Emit 3 at t=1s
Collect 2 at t=2s
Emit 4 at t=2s
Collect 3 at t=3s
Emit 5 at t=3s
Collect 4 at t=4s
Collect 5 at t=5s
i.e. it's running two steps ahead, because there are two buffers in there
Sam
10/02/2024, 11:09 AM
Seems inconsistent with the other flow operators, and it caused me some head-scratching before I looked at the
zip
source code and realised it wasn't fusing
a
Anselmo Alexandre
10/03/2024, 4:11 AM
TIL @Sam
u
Udith
10/03/2024, 1:26 PM
on each will run before the upstream emits. Basically it will print emit 2 message but the actual emit from upstream is suspended
Udith
10/03/2024, 1:26 PM
try this
Copy code
val start = TimeSource.Monotonic.markNow()
val numbers = flow{
repeat(10){
println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s")
emit(it)
delay(1.seconds)
}
}
.buffer(0) // existing buffer
val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }
ticker.zip(numbers) { _, n -> n }
.collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }