Paul Woitaschek
01/05/2022, 9:32 AMprivate suspend fun testBody(scope: CoroutineScope) {
val value = MutableStateFlow(1)
val sharingJob = Job()
val flow = value.shareIn(scope + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
check(flow.first() == 1)
value.value = 2
check(flow.first() == 2)
sharingJob.cancel()
}
@Test
fun withRunBlockingTest() = runBlockingTest {
testBody(this)
}
@Test
fun withRunTest() = runTest {
testBody(this)
}
This passes with runBlockingTest but fails with runTest.
It is possible to fix it by adding a runCurrent before calling flow.first a second time but I think thats not how it’s supposted to be and makes me very uncomfortable writing tests without having a runCurrent between every single lineJoffrey
01/05/2022, 9:35 AMvalue.value = 2
is not suspending, why do you expect flow.first()
to return 2 the second time? You're relying on a race here.
runCurrent
makes it deterministic (ensures the sharing coroutine progresses appropriately), so it would be necessary even in runBlockingTest
. IMO you're just lucky that the implementation details of runBlockingTest
let you assume the test was fine.Joffrey
01/05/2022, 9:56 AMfun main() = runBlocking {
val value = MutableStateFlow(1)
val sharingJob = Job()
val flow = value.shareIn(this + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
check(flow.first() == 1)
value.value = 2
check(flow.first() == 2)
sharingJob.cancel()
}
This consistently fails on the second check
as well, even when I try Dispatchers.Default
:
https://pl.kotl.in/r0dMQ5u0NPaul Woitaschek
01/05/2022, 10:28 AMJoffrey
01/05/2022, 11:46 AMWhileSubscribed
starting strategy in your code. Now I don't even understand how runCurrent
could fix the behaviour, because the sharing coroutine shouldn't even be collecting anything from the state flow between the 2 first()
calls.
If you wanted to keep getting updates from the state flow and put them in the replay cache for future subscribers, you would need another starting strategy like Eagerly
or Lazily
(which don't stop once they start).
WhileSubscribed
controls the collection of the upstream flow (the state flow in this case). Configured this way, it means it will cancel the collection of the state flow as soon as there are no collectors of the shared flow, and restart the collection of the state flow as soon as a collector starts collecting the shared flow. replayExpirationMillis
that you already configure, but the sharing coroutine doesn't have the opportunity to perform the cleanup if you don't runCurrent()
)Paul Woitaschek
01/05/2022, 11:47 AMJoffrey
01/05/2022, 11:52 AMrunCurrent
would allow the sharing coroutine to finish its work, including clearing the replay cache. Got it now