Hello, I'm starting to use `Flow` in my productio...
# coroutines
g
Hello, I'm starting to use
Flow
in my production code, but it seems that my understanding of it and coroutines in general isn't sufficient. I reduced my project code to a simple reproducible test example. I'm using
TestCollector
class to collect values, emitted by the
Flow
:
Copy code
class TestCollector<T>(scope: CoroutineScope, flow: Flow<T>) {

    private val collectedValues = mutableListOf<T>()
    private val job = scope.launch { flow.collect { collectedValues.add(it) } }

    fun assertValues(vararg values: T) = run {
        val valuesList = values.toList()
        if (valuesList != collectedValues) {
            fail("\nExpected values: $valuesList\nCollected values:$collectedValues")
        }
        this
    }

    fun dispose() = job.cancel()
}

fun <T> Flow<T>.test(scope: CoroutineScope) = TestCollector(scope, this)
This is my test itself:
Copy code
@Before
    fun setup() {
        Dispatchers.setMain(Dispatchers.Unconfined)
    }

    @Test
    fun testFlowCollector() = runBlockingTest {
        var firstEmit = true
        val channel = ConflatedBroadcastChannel(0)
        val testCollector = channel.asFlow().onEach {
            if (firstEmit) {
                launch {
                    channel.send(1)
                    channel.send(2)
                }
                firstEmit = false
            }
        }.test(this)

        testCollector.assertValues(0, 1, 2)

        testCollector.dispose()
    }
So basically I'm using blocking test and what I'm trying to do, is launch those
send
s only on the first emit. What I'm expecting to collect are all those three numbers
0, 1, 2
sequentially. But the test fails:
Copy code
java.lang.AssertionError: 
Expected values: [0, 1, 2]
Collected values:[0, 2]
For some reason, the emission of
1
is getting lost. I'm trying to understand what's going on. Most probably I'm just misunderstanding/misusing the flow and the channel. Maybe there are coroutine experts who may explain it to me. 🙂
d
It's because the dispatcher is
Unconfined
and the channel
Conflated
. Values 1 and 2 are added to the channel sequentially without suspension. The same thread requests the latest value from the channel by receiving from it after that, which is 2.
I think at least, if the channel iterator is implemented how I think it is.
Basically the loop of the channel is suspended in a
hasNext()
call, and it is resumed with true when you send 1. But the unconfined dispatcher uses fair ordering, so your launch {} block will return before the resume is dispatched. Then the loop calls
next
which yields 2, because the channel is conflated and therefore only keeps the most recent item.
If the channel iterator were implemented atomically (by which I mean, only using
receive
until an error occurs), the behaviour would be as you expect.
But the behaviour is fine because the channel is conflated, getting the most recent element is expected behaviour.
g
Ok, I'm just trying to achieve the same behavior as I've had with Rx
BehaviorSubject
. In the real code it's something like this:
Copy code
channel.send(State.Loading)
            val data = api.getData() // Retrofit suspended fun
            channel.send(State.Loaded(data))
Is it a valid usage of
ConflatedBroadcastChannel
? If yes, how should I test it properly?
d
Try adding a
yield()
between sending the 1 and the 2
Indeed,
ConflatedBroadcastChannel
has very similar functionality to the concept of
BehaviorSubject
In this instance I think you're running into a bit of an edge case
g
Thanks for the response... From what I understood, it won't happen in my real code. And even if it will, it's logically correct, there is no "loading" 😄 I'll try to change my test to simulate the API return not immediately.
d
You're welcome