I'm curious why `zip` doesn't fuse with an existin...
# coroutines
s
I'm curious why
zip
doesn't fuse with an existing buffer. Is there an important reason for it, or is it just an oversight that could be fixed? I'll put an example in the thread.
Here's the code:
Copy code
val start = TimeSource.Monotonic.markNow()
val numbers = flowOf(1, 2, 3, 4, 5)
  .onEach { println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s") }
  .buffer(0) // existing buffer

val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }

ticker.zip(numbers) { _, n -> n }
  .collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }
We can see from the docs that
zip
collects its
other
flow in a new coroutine, equivalent to
buffer(0)
If the buffers were fused, I'd expect something like this:
Copy code
Emit 1 at t=0s
Collect 1 at t=1s
Emit 2 at t=1s
Collect 2 at t=2s
Emit 3 at t=2s
Collect 3 at t=3s
Emit 4 at t=3s
Collect 4 at t=4s
Emit 5 at t=4s
Collect 5 at t=5s
Where each value is prepared one step ahead of time
But instead, the output looks more like this:
Copy code
Emit 1 at t=0s
Emit 2 at t=0s
Collect 1 at t=1s
Emit 3 at t=1s
Collect 2 at t=2s
Emit 4 at t=2s
Collect 3 at t=3s
Emit 5 at t=3s
Collect 4 at t=4s
Collect 5 at t=5s
i.e. it's running two steps ahead, because there are two buffers in there
Seems inconsistent with the other flow operators, and it caused me some head-scratching before I looked at the
zip
source code and realised it wasn't fusing
a
TIL @Sam
u
on each will run before the upstream emits. Basically it will print emit 2 message but the actual emit from upstream is suspended
try this
Copy code
val start = TimeSource.Monotonic.markNow()
        val numbers = flow{
            repeat(10){
                println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s")
                emit(it)
                delay(1.seconds)
            }
        }
            .buffer(0) // existing buffer

        val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }

        ticker.zip(numbers) { _, n -> n }
            .collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }