Eric Martori
07/19/2019, 11:23 AM@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flow = channel.asFlow()
launch {
flow.collect { assert(true) }
channel.send(1)
}
}
this test fails with`kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs: ["coroutine#2":StandaloneCoroutine{Active}@2a2d45ba]`
it forces me to close the channel for the test to pass.
But if i change the flow
for a receiveChannel
like this:
@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val receive = channel.openSubscription()
launch {
receive.consume { assert(true) }
channel.send(1)
}
}
The test passes without problemstseisel
07/19/2019, 11:54 AMconsume
to consumeEach
in the 2nd snippet. Then it should also suspend forever.Eric Martori
07/19/2019, 1:46 PMprivate val channel = BroadcastChannel<Int>(Channel.CONFLATED)
fun subscribe(coroutineScope: CoroutineScope, block: (Int) -> Unit) {
val flow = channel.asFlow()
coroutineScope.launch {
flow.collect { block(it) }
}
}
tseisel
07/19/2019, 3:58 PMFlow.collect
suspends until there are no more elements in the flow, calling your lamdba for each item, at the time an item is emitted.
In your case you are producing a Flow
from a BroadcastChannel
with asFlow
, therefore the coroutine calling collect
will resume when the source BroadcastChannel
will have no more elements (i.e., when it is closed or fails with an exception)channel.send(1)
is never called. To solve that, you can send to that channel in another coroutine (or the main test block)Eric Martori
07/19/2019, 5:54 PM