Eric Martori
07/19/2019, 11:23 AM@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flow = channel.asFlow()
launch {
flow.collect { assert(true) }
channel.send(1)
}
}
this test fails with`kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs: ["coroutine#2":StandaloneCoroutine{Active}@2a2d45ba]`
it forces me to close the channel for the test to pass.
But if i change the flow
for a receiveChannel
like this:
@Test
fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val receive = channel.openSubscription()
launch {
receive.consume { assert(true) }
channel.send(1)
}
}
The test passes without problemstseisel
07/19/2019, 11:54 AMconsume
to consumeEach
in the 2nd snippet. Then it should also suspend forever.Eric Martori
07/19/2019, 1:46 PMEric Martori
07/19/2019, 1:48 PMEric Martori
07/19/2019, 1:52 PMprivate val channel = BroadcastChannel<Int>(Channel.CONFLATED)
fun subscribe(coroutineScope: CoroutineScope, block: (Int) -> Unit) {
val flow = channel.asFlow()
coroutineScope.launch {
flow.collect { block(it) }
}
}
Eric Martori
07/19/2019, 1:54 PMtseisel
07/19/2019, 3:58 PMFlow.collect
suspends until there are no more elements in the flow, calling your lamdba for each item, at the time an item is emitted.
In your case you are producing a Flow
from a BroadcastChannel
with asFlow
, therefore the coroutine calling collect
will resume when the source BroadcastChannel
will have no more elements (i.e., when it is closed or fails with an exception)tseisel
07/19/2019, 4:02 PMchannel.send(1)
is never called. To solve that, you can send to that channel in another coroutine (or the main test block)Eric Martori
07/19/2019, 5:54 PM