Hi guys! Having this simple custom operator (inspi...
# coroutines
u
Hi guys! Having this simple custom operator (inspired by this one by Roman Elizarov):
Copy code
/**
 * Every time a new value is emitted from A, it will emit instead the latest value from B.
 */
fun <A, B : Any> Flow<A>.switchToLatestFrom(other: Flow<B>): Flow<B> = flow {
    coroutineScope {
        val latestB = AtomicReference<B?>()
        val outerScope = this
        launch {
            try {
                other.collect { latestB.set(it) }
            } catch (e: CancellationException) {
                outerScope.cancel(e)
            }
        }
        collect {
            latestB.get()?.let { b -> emit(b) }
        }
    }
}
And verifying its behavior through some tests suite, I can’t get why sometimes (event randomly) the following test fails:
Copy code
@Test
fun `should emit the latest value from B every time A emits something`() {
    val a = flowOf(1, 2, 3).onEach { delay(100) }
    val b =  flowOf('A', 'B', 'C', 'D').onEach { delay(25) }

    runBlocking {
        val result = a.switchToLatestFrom(b).toList()
        val expected = listOf('C', 'D', 'D')

        assertEquals(
            expected = expected,
            actual = result
        )
    }
}
java.lang.AssertionError:
Expected :[C, D, D]
Actual   :[B, D, D]
Need help! Thanks.
d
You shouldn't really on
delay
(or timing in general) to coordinate tests. It ends up with tests that take longer to run and break sometimes. You should use a
Channel
and
consumeAsFlow()
to coordinate the test.
u
@Dominaezzz, could you give me an example?
d
Copy code
val aChannel = Channel<Int>()
val a = aChannel.consumeAsFlow()
aChannel.send(1)
aChannel.send(2)
aChannel.send(3)
You can then interleave the calls
aChannel.send
and
bChannel.send
to coordinate the flow emissions.
u
@Dominaezzz, i’ve tried something like that, but all I get is forever running test without emitting nor printing something:
Copy code
@Test
fun test() {

    val aChannel = Channel<Int>()
    val bChannel = Channel<String>()

    val a = aChannel.consumeAsFlow()
    val b = bChannel.consumeAsFlow()

    runBlocking {
        a.switchToLatestFrom(b).collect {
            println(it)
        }
        bChannel.send("A")
        aChannel.send(1)
    }
}
d
Collection and sending have to happen in different coroutines.
launch
the sending before collection.
🚀 1
u
thanks, got it working!
Did it in the following manner:
Copy code
@Test
fun `Should emit latest value from B whenever A emits something`() = runBlockingTest {

    val aChannel = Channel<Int>()
    val bChannel = Channel<String>()

    val aFlow = aChannel.consumeAsFlow()
    val bFlow = bChannel.consumeAsFlow()

    launch {
        val result = aFlow.switchToLatestFrom(bFlow).toList()
        assertEquals(
            expected = listOf("A", "B", "D"),
            actual = result
        )
    }

    launch {
        bChannel.send("A")
        aChannel.send(1)
        bChannel.send("B")
        aChannel.send(2)
        bChannel.send("C")
        bChannel.send("D")
        aChannel.send(2)
        bChannel.send("E")
        bChannel.send("F")
        bChannel.send("G")
    }

    aChannel.close()
    bChannel.close()
}
j
When writing such tests, I usually try to keep asserts in the test’s base coroutine. I find that it shows better the intention of the test. You could
launch
with all your
channel.send()
first, and then run your `collect`/`toList` and asserts at the top level (outside of launch).
👍 1