yshrsmz
08/23/2019, 6:51 AMf
does not emit the latest value.
Any idea what's going on?
Actual output is in the thread.
const val DURATION = 100L
class ExampleUnitTest {
@Test
fun test() = runBlocking {
val job = launch {
val f = generateSequence(1) { it + 1 }.asFlow()
.onEach { delay(DURATION) }
.onStart { emit(0) }
.onEach { println("${System.currentTimeMillis()}:onNext-f: $it") }
val f2 = generateSequence(1) { it + 1 }.asFlow()
.onEach { delay(DURATION * 3) }
.onStart { emit(0) }
.onEach { println("${System.currentTimeMillis()}:onNext-f2: $it") }
f2.zip(f.conflate()) { a, b -> "$a, $b" }
.collectIndexed { index, value -> println("${System.currentTimeMillis()}:collect-$index: $value") }
}
job.join()
}
}
yshrsmz
08/23/2019, 6:51 AM1566542489201:onNext-f2: 0
1566542489214:onNext-f: 0
1566542489216:collect-0: 0, 0
1566542489320:onNext-f: 1
1566542489421:onNext-f: 2
1566542489508:onNext-f2: 1
1566542489509:collect-1: 1, 1
1566542489525:onNext-f: 3
1566542489627:onNext-f: 4
1566542489731:onNext-f: 5
1566542489812:onNext-f2: 2
1566542489813:collect-2: 2, 2
1566542489836:onNext-f: 6
1566542489938:onNext-f: 7
1566542490043:onNext-f: 8
1566542490118:onNext-f2: 3
1566542490118:collect-3: 3, 5
1566542490148:onNext-f: 9
1566542490252:onNext-f: 10
1566542490356:onNext-f: 11
1566542490422:onNext-f2: 4
1566542490422:collect-4: 4, 8
1566542490460:onNext-f: 12
1566542490565:onNext-f: 13
1566542490666:onNext-f: 14
1566542490722:onNext-f2: 5
1566542490723:collect-5: 5, 11
1566542490767:onNext-f: 15
1566542490873:onNext-f: 16
1566542490973:onNext-f: 17
1566542491023:onNext-f2: 6
1566542491023:collect-6: 6, 14
yshrsmz
08/23/2019, 6:53 AMcollect-1
, I expect collect-1: 1, 2
but the actual output is collect-1: 1, 1
bdawg.io
08/23/2019, 7:04 AM.zip
uses rendezvous channels underneath eagerly collecting the first item from each flow. So you're seeing the next item that was collected (after being conflated during the longer delay of the f2
sequence)bdawg.io
08/23/2019, 7:08 AMf: 1
is collected, and f: 2
is the next item that was conflated waiting to be emitted. Then f2: 1
arrives, which empties both channels (f: 1
, f2: 1`) and f: 2
is captured. f: 3
, f: 4
, and f: 5
arrive and are conflated. then f2: 2
arrives. The channels are emptied (f: 2
, f2: 2
) and f: 5
is captured. and so onbdawg.io
08/23/2019, 7:17 AM1566542489320:onNext-f: 1
- held by 'f' channel. (f chan: 1)
1566542489421:onNext-f: 2
- held by 'f' conflate. (f chan: 1, f conflate: 2)
1566542489508:onNext-f2: 1
- held by `f2` channel. (f chan: 1, f conflate: 2, f2 chan: 1)
- both channels have a value, now to zip
1566542489509:collect-1: 1, 1
- channels emptied ('f: 1', 'f2: 1')
- 'f' conflate value (2) captured by 'f' chan (f chan: 2, f conflate: <empty>)
1566542489525:onNext-f: 3
- held by 'f' conflate (f chan: 2, f conflate: 3)
1566542489627:onNext-f: 4
- overwrite by 'f' conflate (f chan: 2, f conflate: 4)
1566542489731:onNext-f: 5
- overwrite by 'f' conflate (f chan: 2, f conflate: 5)
1566542489812:onNext-f2: 2
- held by `f2` channel (f chan: 2, f conflate: 5, f2 chan: 2)
1566542489813:collect-2: 2, 2
- channels emptied ('f: 2', 'f2: 2')
- 'f' conflate value (5) captured by 'f' chan (f chan: 5, f conflate: <empty>)
bdawg.io
08/23/2019, 7:19 AMconflate
operator is backed by a channel as well, which is why there's a value "held" until it is collectedyshrsmz
08/23/2019, 7:21 AMbdawg.io
08/23/2019, 7:25 AMyshrsmz
08/23/2019, 7:38 AMI expectbut the actual output iscollect-1: 1, 2
collect-1: 1, 1