georgiy.shur
01/18/2020, 4:13 PMFlow
in my production code, but it seems that my understanding of it and coroutines in general isn't sufficient.
I reduced my project code to a simple reproducible test example.
I'm using TestCollector
class to collect values, emitted by the Flow
:
class TestCollector<T>(scope: CoroutineScope, flow: Flow<T>) {
private val collectedValues = mutableListOf<T>()
private val job = scope.launch { flow.collect { collectedValues.add(it) } }
fun assertValues(vararg values: T) = run {
val valuesList = values.toList()
if (valuesList != collectedValues) {
fail("\nExpected values: $valuesList\nCollected values:$collectedValues")
}
this
}
fun dispose() = job.cancel()
}
fun <T> Flow<T>.test(scope: CoroutineScope) = TestCollector(scope, this)
This is my test itself:
@Before
fun setup() {
Dispatchers.setMain(Dispatchers.Unconfined)
}
@Test
fun testFlowCollector() = runBlockingTest {
var firstEmit = true
val channel = ConflatedBroadcastChannel(0)
val testCollector = channel.asFlow().onEach {
if (firstEmit) {
launch {
channel.send(1)
channel.send(2)
}
firstEmit = false
}
}.test(this)
testCollector.assertValues(0, 1, 2)
testCollector.dispose()
}
So basically I'm using blocking test and what I'm trying to do, is launch those send
s only on the first emit. What I'm expecting to collect are all those three numbers 0, 1, 2
sequentially. But the test fails:
java.lang.AssertionError:
Expected values: [0, 1, 2]
Collected values:[0, 2]
For some reason, the emission of 1
is getting lost.
I'm trying to understand what's going on. Most probably I'm just misunderstanding/misusing the flow and the channel. Maybe there are coroutine experts who may explain it to me. 🙂Dico
01/18/2020, 5:55 PMUnconfined
and the channel Conflated
. Values 1 and 2 are added to the channel sequentially without suspension. The same thread requests the latest value from the channel by receiving from it after that, which is 2.hasNext()
call, and it is resumed with true when you send 1. But the unconfined dispatcher uses fair ordering, so your launch {} block will return before the resume is dispatched. Then the loop calls next
which yields 2, because the channel is conflated and therefore only keeps the most recent item.receive
until an error occurs), the behaviour would be as you expect.georgiy.shur
01/19/2020, 11:34 AMBehaviorSubject
. In the real code it's something like this:
channel.send(State.Loading)
val data = api.getData() // Retrofit suspended fun
channel.send(State.Loaded(data))
Is it a valid usage of ConflatedBroadcastChannel
? If yes, how should I test it properly?Dico
01/19/2020, 11:40 AMyield()
between sending the 1 and the 2ConflatedBroadcastChannel
has very similar functionality to the concept of BehaviorSubject
georgiy.shur
01/19/2020, 11:50 AMDico
01/19/2020, 5:58 PM