Sam
10/02/2024, 11:05 AMzip
doesn't fuse with an existing buffer. Is there an important reason for it, or is it just an oversight that could be fixed? I'll put an example in the thread.Sam
10/02/2024, 11:05 AMval start = TimeSource.Monotonic.markNow()
val numbers = flowOf(1, 2, 3, 4, 5)
.onEach { println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s") }
.buffer(0) // existing buffer
val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }
ticker.zip(numbers) { _, n -> n }
.collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }
Sam
10/02/2024, 11:05 AMzip
collects its other
flow in a new coroutine, equivalent to buffer(0)
Sam
10/02/2024, 11:06 AMEmit 1 at t=0s
Collect 1 at t=1s
Emit 2 at t=1s
Collect 2 at t=2s
Emit 3 at t=2s
Collect 3 at t=3s
Emit 4 at t=3s
Collect 4 at t=4s
Emit 5 at t=4s
Collect 5 at t=5s
Where each value is prepared one step ahead of timeSam
10/02/2024, 11:07 AMEmit 1 at t=0s
Emit 2 at t=0s
Collect 1 at t=1s
Emit 3 at t=1s
Collect 2 at t=2s
Emit 4 at t=2s
Collect 3 at t=3s
Emit 5 at t=3s
Collect 4 at t=4s
Collect 5 at t=5s
i.e. it's running two steps ahead, because there are two buffers in thereSam
10/02/2024, 11:09 AMzip
source code and realised it wasn't fusingAnselmo Alexandre
10/03/2024, 4:11 AMUdith
10/03/2024, 1:26 PMUdith
10/03/2024, 1:26 PMval start = TimeSource.Monotonic.markNow()
val numbers = flow{
repeat(10){
println("Emit $it at t=${start.elapsedNow().inWholeSeconds}s")
emit(it)
delay(1.seconds)
}
}
.buffer(0) // existing buffer
val ticker = flow { while (true) emit(Unit) }.onEach { delay(1.seconds) }
ticker.zip(numbers) { _, n -> n }
.collect { println("Collect $it at t=${start.elapsedNow().inWholeSeconds}s") }