Hi, I've been trying to zip two flows' latest valu...
# coroutines
y
Hi, I've been trying to zip two flows' latest values and wrote below code. But it seems that flow
f
does not emit the latest value. Any idea what's going on? Actual output is in the thread.
Copy code
const val DURATION = 100L

class ExampleUnitTest {
    @Test
    fun test() = runBlocking {
        val job = launch {
            val f = generateSequence(1) { it + 1 }.asFlow()
                .onEach { delay(DURATION) }
                .onStart { emit(0) }
                .onEach { println("${System.currentTimeMillis()}:onNext-f: $it") }

            val f2 = generateSequence(1) { it + 1 }.asFlow()
                .onEach { delay(DURATION * 3) }
                .onStart { emit(0) }
                .onEach { println("${System.currentTimeMillis()}:onNext-f2: $it") }


            f2.zip(f.conflate()) { a, b -> "$a, $b" }
                .collectIndexed { index, value -> println("${System.currentTimeMillis()}:collect-$index: $value") }
        }

        job.join()
    }
}
here's the output
Copy code
1566542489201:onNext-f2: 0
    1566542489214:onNext-f: 0
    1566542489216:collect-0: 0, 0
    1566542489320:onNext-f: 1
    1566542489421:onNext-f: 2
    1566542489508:onNext-f2: 1
    1566542489509:collect-1: 1, 1
    1566542489525:onNext-f: 3
    1566542489627:onNext-f: 4
    1566542489731:onNext-f: 5
    1566542489812:onNext-f2: 2
    1566542489813:collect-2: 2, 2
    1566542489836:onNext-f: 6
    1566542489938:onNext-f: 7
    1566542490043:onNext-f: 8
    1566542490118:onNext-f2: 3
    1566542490118:collect-3: 3, 5
    1566542490148:onNext-f: 9
    1566542490252:onNext-f: 10
    1566542490356:onNext-f: 11
    1566542490422:onNext-f2: 4
    1566542490422:collect-4: 4, 8
    1566542490460:onNext-f: 12
    1566542490565:onNext-f: 13
    1566542490666:onNext-f: 14
    1566542490722:onNext-f2: 5
    1566542490723:collect-5: 5, 11
    1566542490767:onNext-f: 15
    1566542490873:onNext-f: 16
    1566542490973:onNext-f: 17
    1566542491023:onNext-f2: 6
    1566542491023:collect-6: 6, 14
for example at
collect-1
, I expect
collect-1: 1, 2
but the actual output is
collect-1: 1, 1
b
.zip
uses rendezvous channels underneath eagerly collecting the first item from each flow. So you're seeing the next item that was collected (after being conflated during the longer delay of the
f2
sequence)
Not to get into too much detail,
f: 1
is collected, and
f: 2
is the next item that was conflated waiting to be emitted. Then
f2: 1
arrives, which empties both channels (
f: 1
, f2: 1`) and
f: 2
is captured.
f: 3
,
f: 4
, and
f: 5
arrive and are conflated. then
f2: 2
arrives. The channels are emptied (
f: 2
,
f2: 2
) and
f: 5
is captured. and so on
A step-by-step state change based on part of the output you shared above:
Copy code
1566542489320:onNext-f: 1
        - held by 'f' channel. (f chan: 1)
    1566542489421:onNext-f: 2
        - held by 'f' conflate. (f chan: 1, f conflate: 2)
    1566542489508:onNext-f2: 1
        - held by `f2` channel. (f chan: 1, f conflate: 2, f2 chan: 1)
        - both channels have a value, now to zip
    1566542489509:collect-1: 1, 1
        - channels emptied ('f: 1', 'f2: 1')
        - 'f' conflate value (2) captured by 'f' chan (f chan: 2, f conflate: <empty>)
    1566542489525:onNext-f: 3
        - held by 'f' conflate (f chan: 2, f conflate: 3)
    1566542489627:onNext-f: 4
        - overwrite by 'f' conflate (f chan: 2, f conflate: 4)
    1566542489731:onNext-f: 5
        - overwrite by 'f' conflate (f chan: 2, f conflate: 5)
    1566542489812:onNext-f2: 2
        - held by `f2` channel (f chan: 2, f conflate: 5, f2 chan: 2)
    1566542489813:collect-2: 2, 2
        - channels emptied ('f: 2', 'f2: 2')
        - 'f' conflate value (5) captured by 'f' chan (f chan: 5, f conflate: <empty>)
It's also good to note that the
conflate
operator is backed by a channel as well, which is why there's a value "held" until it is collected
y
I see, so zip is immediately capturing the next value, and then f conflates its value.
1
b
Yeah, f conflates the next value(s) after that until zip makes use of the one it's already captured
🙏 1
y
Thanks a lot, that makes sense. Is there any way I can achieve this?
I expect
collect-1: 1, 2
but the actual output is
collect-1: 1, 1