Michal Klimczak
11/13/2020, 7:26 AM@Test
fun `stateIn self-contained example`() = runBlockingTest {
suspend fun makeHeavyRequest(): String {
return "heavy result"
}
val flow = flow<Unit> {} //in production this is a channel meant to refresh the Flow
.onStart { emit(Unit) }
.map { makeHeavyRequest() } //using mapLatest breaks
.flowOn(testDispatcher)
// .onEach { } //uncommenting this line also breaks
.stateIn(GlobalScope, SharingStarted.WhileSubscribed(), "init state")
val results = mutableListOf<String>()
val job = launch {
flow.collect { results.add(it) }
}
assertEquals("heavy result", results[0])
job.cancel()
}
1. Why the test breaks (I get init state
instead of heavy result
) when I use mapLatest
instead of map
?
2. Why the test breaks (I get init state
instead of heavy result
) when I uncomment the onEach
below the flowOn
?
3. Bonus question: why don't I get both, the init state
AND the heavy result
?Asagald
11/13/2020, 9:22 AMGlobalScope
you use in stateIn
is the problemShalom Halbert
11/13/2020, 10:45 AMStateFlow
and SharedFlow
that you do not face with other Flow
s is that they never complete. So, you need a system for closing the flow after your assertion is completed.Michal Klimczak
11/13/2020, 11:28 AMMichal Klimczak
11/13/2020, 11:29 AMMichal Klimczak
11/13/2020, 11:32 AMShalom Halbert
11/13/2020, 12:33 PMGlobalScope
is an issue because it means that Flow
is not being run within the same CoroutineScope
as runBlockingTest
which probably created a race-condition.
Assuming runBlockingTest
makes the asynchronous work run synchronously, which is the conventional implementation: Launching the Flow
from the `runBlockingTest`’s CoroutineScope
should help with running a cold Flow
by ensuring the Flow
runs to completion before the test ends. However, with hot `Flow`s like StateFlow
that doesn’t work because the Flow
never completes. That’s where Turbine became convenient for me. You can call expectItem()
and cancelAndIgnoreRemainingEvents()
.Michal Klimczak
11/13/2020, 2:37 PMMichal Klimczak
11/13/2020, 2:52 PM@Test
fun `stateFlow (turbine)`() = runBlockingTest {
suspend fun makeHeavyRequest(): String {
return "heavy result"
}
val flow = flow<Unit> {}
.onStart { emit(Unit) }
.map { makeHeavyRequest() }
.onEach { logThread("before flowOn") }
.flowOn(testDispatcher)
.onEach { logThread("after flowOn") } //why this changes the dispatcher for the whole thing?
.stateIn(this, SharingStarted.WhileSubscribed(), "init state")
flow.test {
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
}
}
When I do this it also finishes with kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs: ["coroutine#6":StandaloneCoroutine{Active}@409bf450]
Michal Klimczak
11/13/2020, 3:23 PM