georgiy.shur
01/18/2020, 4:13 PMFlow
in my production code, but it seems that my understanding of it and coroutines in general isn't sufficient.
I reduced my project code to a simple reproducible test example.
I'm using TestCollector
class to collect values, emitted by the Flow
:
class TestCollector<T>(scope: CoroutineScope, flow: Flow<T>) {
private val collectedValues = mutableListOf<T>()
private val job = scope.launch { flow.collect { collectedValues.add(it) } }
fun assertValues(vararg values: T) = run {
val valuesList = values.toList()
if (valuesList != collectedValues) {
fail("\nExpected values: $valuesList\nCollected values:$collectedValues")
}
this
}
fun dispose() = job.cancel()
}
fun <T> Flow<T>.test(scope: CoroutineScope) = TestCollector(scope, this)
This is my test itself:
@Before
fun setup() {
Dispatchers.setMain(Dispatchers.Unconfined)
}
@Test
fun testFlowCollector() = runBlockingTest {
var firstEmit = true
val channel = ConflatedBroadcastChannel(0)
val testCollector = channel.asFlow().onEach {
if (firstEmit) {
launch {
channel.send(1)
channel.send(2)
}
firstEmit = false
}
}.test(this)
testCollector.assertValues(0, 1, 2)
testCollector.dispose()
}
So basically I'm using blocking test and what I'm trying to do, is launch those send
s only on the first emit. What I'm expecting to collect are all those three numbers 0, 1, 2
sequentially. But the test fails:
java.lang.AssertionError:
Expected values: [0, 1, 2]
Collected values:[0, 2]
For some reason, the emission of 1
is getting lost.
I'm trying to understand what's going on. Most probably I'm just misunderstanding/misusing the flow and the channel. Maybe there are coroutine experts who may explain it to me. 🙂Dico
01/18/2020, 5:55 PMUnconfined
and the channel Conflated
. Values 1 and 2 are added to the channel sequentially without suspension. The same thread requests the latest value from the channel by receiving from it after that, which is 2.Dico
01/18/2020, 5:59 PMDico
01/18/2020, 6:02 PMhasNext()
call, and it is resumed with true when you send 1. But the unconfined dispatcher uses fair ordering, so your launch {} block will return before the resume is dispatched. Then the loop calls next
which yields 2, because the channel is conflated and therefore only keeps the most recent item.Dico
01/18/2020, 6:04 PMreceive
until an error occurs), the behaviour would be as you expect.Dico
01/18/2020, 6:06 PMgeorgiy.shur
01/19/2020, 11:34 AMBehaviorSubject
. In the real code it's something like this:
channel.send(State.Loading)
val data = api.getData() // Retrofit suspended fun
channel.send(State.Loaded(data))
Is it a valid usage of ConflatedBroadcastChannel
? If yes, how should I test it properly?Dico
01/19/2020, 11:40 AMyield()
between sending the 1 and the 2Dico
01/19/2020, 11:44 AMConflatedBroadcastChannel
has very similar functionality to the concept of BehaviorSubject
Dico
01/19/2020, 11:45 AMgeorgiy.shur
01/19/2020, 11:50 AMDico
01/19/2020, 5:58 PM