https://kotlinlang.org logo
#coroutines
Title
# coroutines
e

Eric Martori

07/19/2019, 11:23 AM
Hi everyone. I found an issue with broadcast channel, flows and tests:
Copy code
@Test
    fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
        val channel = BroadcastChannel<Int>(Channel.CONFLATED)
        val flow = channel.asFlow()
        launch {
            flow.collect { assert(true) }
            channel.send(1)
        }
    }
this test fails with`kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs: ["coroutine#2":StandaloneCoroutine{Active}@2a2d45ba]` it forces me to close the channel for the test to pass. But if i change the
flow
for a
receiveChannel
like this:
Copy code
@Test
    fun `BroadcastChannel keep a coroutine alive`() = runBlockingTest {
        val channel = BroadcastChannel<Int>(Channel.CONFLATED)
        val receive = channel.openSubscription()
        launch {
            receive.consume { assert(true) }
            channel.send(1)
        }
    }
The test passes without problems
t

tseisel

07/19/2019, 11:54 AM
Those 2 code snippets do not have the same behavior : you need to change
consume
to
consumeEach
in the 2nd snippet. Then it should also suspend forever.
e

Eric Martori

07/19/2019, 1:46 PM
Correct. Then, how could I stop this from suspending forever?
in my production code I have a BroadcastChannel and open a flow each time something needs to subscribe to its values. The problem is that when I try to test a piece of code that interacts with it never stops suspending
It is hard for me to explain the code by words. A simplified version would be like this:
Copy code
private val channel = BroadcastChannel<Int>(Channel.CONFLATED)
    
    fun subscribe(coroutineScope: CoroutineScope, block: (Int) -> Unit) {
        val flow = channel.asFlow()
        coroutineScope.launch {
            flow.collect { block(it) }
        }
    }
then when I try to test some class or method that uses this subscribe method I get a suspending forever error.
t

tseisel

07/19/2019, 3:58 PM
Flow.collect
suspends until there are no more elements in the flow, calling your lamdba for each item, at the time an item is emitted. In your case you are producing a
Flow
from a
BroadcastChannel
with
asFlow
, therefore the coroutine calling
collect
will resume when the source
BroadcastChannel
will have no more elements (i.e., when it is closed or fails with an exception)
That's why there is a deadlock in your test snippets : the launched coroutine is suspended until all elements have been consumed/collected, and
channel.send(1)
is never called. To solve that, you can send to that channel in another coroutine (or the main test block)
e

Eric Martori

07/19/2019, 5:54 PM
after fiddling with the code I've found out that for the suspension to break I need to cancel the job returned by launch or close the channel. I just need to make sure that all subscriptions are closed before the end of the test. With my actual code this is easy to achieve for testing purposes and it was already done by default in my production code.
8 Views