It seems that runTest is somehow broken with share...
# coroutines
p
It seems that runTest is somehow broken with shared flows:
Copy code
private suspend fun testBody(scope: CoroutineScope) {
    val value = MutableStateFlow(1)
    val sharingJob = Job()
    val flow = value.shareIn(scope + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
    check(flow.first() == 1)
    value.value = 2
    check(flow.first() == 2)
    sharingJob.cancel()
  }

  @Test
  fun withRunBlockingTest() = runBlockingTest {
    testBody(this)
  }

  @Test
  fun withRunTest() = runTest {
    testBody(this)
  }
This passes with runBlockingTest but fails with runTest. It is possible to fix it by adding a runCurrent before calling flow.first a second time but I think thats not how it’s supposted to be and makes me very uncomfortable writing tests without having a runCurrent between every single line
j
value.value = 2
is not suspending, why do you expect
flow.first()
to return 2 the second time? You're relying on a race here.
runCurrent
makes it deterministic (ensures the sharing coroutine progresses appropriately), so it would be necessary even in
runBlockingTest
. IMO you're just lucky that the implementation details of
runBlockingTest
let you assume the test was fine.
Note that the behaviour you were relying on would also be wrong in production code:
Copy code
fun main() = runBlocking {
    val value = MutableStateFlow(1)
    val sharingJob = Job()
    val flow = value.shareIn(this + sharingJob, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
    check(flow.first() == 1)
    value.value = 2
    check(flow.first() == 2)
    sharingJob.cancel()
}
This consistently fails on the second
check
as well, even when I try
Dispatchers.Default
: https://pl.kotl.in/r0dMQ5u0N
p
Ah dang then I think WhileSubscribed does not do what I expect it to do. I need a behavior where it only replays when there are new subscribers
🤔 1
j
I'm not quite sure I understand your requirements, replay is always about new subscribers. By the way, I had overlooked the
WhileSubscribed
starting strategy in your code. Now I don't even understand how
runCurrent
could fix the behaviour, because the sharing coroutine shouldn't even be collecting anything from the state flow between the 2
first()
calls. If you wanted to keep getting updates from the state flow and put them in the replay cache for future subscribers, you would need another starting strategy like
Eagerly
or
Lazily
(which don't stop once they start).
WhileSubscribed
controls the collection of the upstream flow (the state flow in this case). Configured this way, it means it will cancel the collection of the state flow as soon as there are no collectors of the shared flow, and restart the collection of the state flow as soon as a collector starts collecting the shared flow. It doesn't affect replay, though, so any value previously put in the replay cache can be replayed to new subscribers before the shared flow starts collecting the upstream again. (EDIT: the cleaning of the replay cache is controlled by
replayExpirationMillis
that you already configure, but the sharing coroutine doesn't have the opportunity to perform the cleanup if you don't
runCurrent()
)
p
The issue here is that it replays an old value, even though there are no subscribers any longer
j
I see, so yeah
runCurrent
would allow the sharing coroutine to finish its work, including clearing the replay cache. Got it now