Hey all! I'm learning how state flows work and try...
# flow
h
Hey all! I'm learning how state flows work and trying to test something with the
StateFlow
. My test looks like this:
Copy code
@Test
fun `test state flow`() = runBlocking {
    val stateFlow = flowOf(1, 2, 3)
        .onEach {
            println(it)
        }
        .stateIn(
            scope = TestCoroutineScope(),
            started = SharingStarted.WhileSubscribed(),
            initialValue = 0
        )
    
    val results = mutableListOf<Int>()
    val job = stateFlow.collectAsync(TestCoroutineScope()) {
        results.add(it)
    }

    assertEquals(listOf(0, 1, 2, 3, 4), results)
    
    job.cancel()
}
I was expecting this test to pass but it is failing. The actual size of results is
1
and the value it contains is
3
. I debugged my test with
onEach
and in console every value from the source flow is printed which means the value was emitted but it was never received in the collector. Does anybody know why am I only receiving the last value in my collector in the above code?
n
Use a
StateFlow
when you only care about the latest value. If you need every value, you should use a
SharedFlow
. I think 1, 2, 3 are all getting processed before the
collect
lambda runs so only the 3 is left because it's the latest.
flowOf
doesn't suspend while looping through values and sending values into the
StateFlow
never suspends (it just throws away old unprocessed values), so the initial code populating the
StateFlow
never yields and finishes without the
collect
code having a chance to run.
h
Why does this happen only on the initial code? I tried the following code and it is working as expected:
Copy code
@Test
fun `test state flow`() = runBlocking {
    val channel = Channel<Int>()
    val stateFlow = channel.receiveAsFlow()
        .onEach {
            println(it)
        }
        .stateIn(
            scope = TestCoroutineScope(),
            started = SharingStarted.WhileSubscribed(),
            initialValue = 0
        )
    
    val results = mutableListOf<Int>()
    val job = stateFlow.collectAsync(TestCoroutineScope()) {
        results.add(it)
    }

    channel.trySend(1)
    channel.trySend(2)
    channel.trySend(3)

    assertEquals(listOf(0, 1, 2, 3, 4), results)
    
    job.cancel()
}
n
I'm not sure exactly why this one prints all of them, apparently
trySend
is directly resuming the
collect
code in
stateIn
. But these two examples are quite different. Consider
flowOf(1,2,3).collect { printLn(it) }
. It never suspends. It all runs synchronously inside the collecting coroutine. With the
Channel
you have two coroutines, one `trySend`ing and the other `collect`ing.
Btw, you are using a bunch of scopes that are completely unrelated which makes it extra hard to reason about any of the code. If you just use the
runBlocking
scope then you end up with no data at all.
Trying to predict exact dispatcher behavior is likely to cause problems because even if you aren't just wrong, the implementation can change underneath you and then all the code depending on that exact behavior is broken. Code such that concurrent coroutines can run in any order and resume immediately or much later.
runBlockingTest
and the newer
runTest
provide functions like
runCurrent
that let you manipulate the dispatcher to force pending coroutines to run. Those are helpful in removing chance.
h
That's is a fair suggestion, but using
runBlockingTest
does not change results of this test. But I understand your point here. It would provide us with the predictable behavior where we can understand what is actually happening. I'll have to dig deeper into this. But that you for your help @Nick Allen