Hi, There is something I don't get with flows: `...
# coroutines
c
Hi, There is something I don't get with flows:
Copy code
@Test
    fun `my test OK`() = runTest {
        val emitter1 = emptyFlow<Int>()
        val emitter2 = MutableSharedFlow<Int>()
        val merged = merge(emitter1, emitter2)

        val collected = mutableListOf<Int>()
        val job = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
            merged.toList(collected)
        }

        emitter2.emit(1)
        emitter2.emit(2)
        emitter2.emit(3)
        job.cancelAndJoin()
        assertEquals(
            listOf(
                1,
                2,
                3
            ),
            collected
        )
    }

    @Test
    fun `my test KO`() = runTest {
        val emitter1 = emptyFlow<Int>()
        val emitter2 = MutableSharedFlow<Int>()
        val merged = merge(emitter1, emitter2)
            .shareIn(
                scope = this,
                started = SharingStarted.WhileSubscribed(5000)
            )

        val collected = mutableListOf<Int>()
        val job = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
            merged.toList(collected)
        }

        emitter2.emit(1)
        emitter2.emit(2)
        emitter2.emit(3)
        job.cancelAndJoin()
        assertEquals(
            listOf(
                1,
                2,
                3
            ),
            collected
        )
    }
Why
collected
is empty in the 2nd test?
s
Looks like the difference is that you added
shareIn
, right? In that case, my guess is that it's due to the added concurrency.
shareIn
will launch an additional coroutine to do the sharing, and in the single-threaded
runTest
block, that coroutine likely won't get a chance to run unless your test yields control with a
delay()
or something.
c
Yep, that's the difference. In the first case `merged`is a Flow, while in the second test `merged`is a SharedFlow. So how can I make the 2nd test work?
s
If you're already using
UnconfinedTestDispatcher
, maybe
shareIn(scope = backgroundScope + UnconfinedTestDispatcher(…))
would do the trick? I'm not sure. I think there are probably better ways, e.g. using
Turbine
to receive the items
c
It doesn't work better with Turbine
Copy code
@Test
    fun `my test KO with Turbine`() = runTest {
        val emitter1 = emptyFlow<Int>()
        val emitter2 = MutableSharedFlow<Int>()
        val merged = merge(emitter1, emitter2)
            .shareIn(
                scope = this,
                started = SharingStarted.WhileSubscribed(5000)
            )

        merged.test {
            emitter2.emit(1)
            emitter2.emit(2)
            emitter2.emit(3)

            assertEquals(
                listOf(
                    1,
                    2,
                    3
                ),
                listOf(
                    awaitItem(),
                    awaitItem(),
                    awaitItem(),
                )
            )
        }
    }
Copy code
No value produced in 3s
app.cash.turbine.TurbineAssertionError: No value produced in 3s
If you're already using
UnconfinedTestDispatcher
, maybe
shareIn(scope = backgroundScope + UnconfinedTestDispatcher(…))
would do the trick?
Yes, it works.
I could make it work with Turbine like this
Copy code
@Test
    fun `my test OK with Turbine`() = runTest {
        turbineScope {
            val emitter1 = emptyFlow<Int>()
            val emitter2 = MutableSharedFlow<Int>()
            val merged = merge(emitter1, emitter2)
                .shareIn(
                    scope = backgroundScope + UnconfinedTestDispatcher(testScheduler),
                    started = SharingStarted.WhileSubscribed(5000)
                )

            val turbine = merged.testIn(backgroundScope + UnconfinedTestDispatcher(testScheduler))
            emitter2.emit(1)
            emitter2.emit(2)
            emitter2.emit(3)

            assertEquals(
                listOf(
                    1,
                    2,
                    3
                ),
                listOf(
                    turbine.awaitItem(),
                    turbine.awaitItem(),
                    turbine.awaitItem(),
                )
            )
        }
    }
👀 1