ubu
08/14/2020, 12:35 PM/**
* Every time a new value is emitted from A, it will emit instead the latest value from B.
*/
fun <A, B : Any> Flow<A>.switchToLatestFrom(other: Flow<B>): Flow<B> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e)
}
}
collect {
latestB.get()?.let { b -> emit(b) }
}
}
}
And verifying its behavior through some tests suite, I can’t get why sometimes (event randomly) the following test fails:
@Test
fun `should emit the latest value from B every time A emits something`() {
val a = flowOf(1, 2, 3).onEach { delay(100) }
val b = flowOf('A', 'B', 'C', 'D').onEach { delay(25) }
runBlocking {
val result = a.switchToLatestFrom(b).toList()
val expected = listOf('C', 'D', 'D')
assertEquals(
expected = expected,
actual = result
)
}
}
java.lang.AssertionError:
Expected :[C, D, D]
Actual :[B, D, D]
Need help! Thanks.Dominaezzz
08/14/2020, 12:39 PMdelay
(or timing in general) to coordinate tests. It ends up with tests that take longer to run and break sometimes.
You should use a Channel
and consumeAsFlow()
to coordinate the test.ubu
08/14/2020, 12:49 PMDominaezzz
08/14/2020, 1:04 PMval aChannel = Channel<Int>()
val a = aChannel.consumeAsFlow()
aChannel.send(1)
aChannel.send(2)
aChannel.send(3)
You can then interleave the calls aChannel.send
and bChannel.send
to coordinate the flow emissions.ubu
08/14/2020, 3:59 PM@Test
fun test() {
val aChannel = Channel<Int>()
val bChannel = Channel<String>()
val a = aChannel.consumeAsFlow()
val b = bChannel.consumeAsFlow()
runBlocking {
a.switchToLatestFrom(b).collect {
println(it)
}
bChannel.send("A")
aChannel.send(1)
}
}
Dominaezzz
08/14/2020, 4:26 PMlaunch
the sending before collection.ubu
08/14/2020, 7:30 PM@Test
fun `Should emit latest value from B whenever A emits something`() = runBlockingTest {
val aChannel = Channel<Int>()
val bChannel = Channel<String>()
val aFlow = aChannel.consumeAsFlow()
val bFlow = bChannel.consumeAsFlow()
launch {
val result = aFlow.switchToLatestFrom(bFlow).toList()
assertEquals(
expected = listOf("A", "B", "D"),
actual = result
)
}
launch {
bChannel.send("A")
aChannel.send(1)
bChannel.send("B")
aChannel.send(2)
bChannel.send("C")
bChannel.send("D")
aChannel.send(2)
bChannel.send("E")
bChannel.send("F")
bChannel.send("G")
}
aChannel.close()
bChannel.close()
}
Joffrey
08/16/2020, 6:34 AMlaunch
with all your channel.send()
first, and then run your `collect`/`toList` and asserts at the top level (outside of launch).