https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Andrey Stepankov

04/06/2020, 2:34 PM
Hi, I'm trying to understand reasons of number disappearance. So when I'm using jvm default dispatcher I get strange results, but When I'm using runBlockingTest, everything goes all right. It uses another dispatcher. How can we get similar output when using runBlockingTest or it's an expected behavior for ConflatedBroadcastChannel?
Copy code
@Test
  fun test_ConflatedBroadcastChannel_runBlocking() {
    runBlocking {
      val channel = ConflatedBroadcastChannel<Int>()

      val collect = async {
        channel.asFlow().collect {
          println("collected $it")
        }
      }
      val send = async {
        (0 until 5).forEach {
          println("send $it")
          channel.send(it)
        }
        channel.cancel()
      }

      listOf(collect, send).awaitAll()
    }

    // console out
    // send 0
    // send 1
    // send 2
    // send 3
    // send 4
    // collected 0
    // collected 4
  }

  @Test
  fun test_ConflatedBroadcastChannel_runBlockingTest() {
    runBlockingTest {
      val channel = ConflatedBroadcastChannel<Int>()

      val collect = async {
        channel.asFlow().collect {
          println("collected $it")
        }
      }
      val send = async {
        (0 until 5).forEach {
          println("send $it")
          channel.send(it)
        }
        channel.cancel()
      }

      listOf(collect, send).awaitAll()
    }

    // console out
    // send 0
    // collected 0
    // send 1
    // collected 1
    // send 2
    // collected 2
    // send 3
    // collected 3
    // send 4
    // collected 4
  }
t

tseisel

04/06/2020, 3:15 PM
As you have certainly noticed,
runBlockingTest
is designed for simplifying unit testing of coroutine code. To do so, it offers some simplifications of the coroutine execution model: • coroutines started with
launch
or
async
are executed eagerly and synchronously, right after their declaration • functions that involve time such as
delay
or
withTimeout
are fast-forwarded (virtual time) to make tests execute way faster. Therefore,
runBlockingTest
does not behave like its "production code" implementation
runBlocking
. When you call
async
, its block is not executed immediately, but scheduled for execution. This means that if you start multiple coroutines at the same time, there is no guarantee that coroutine#1 will run before coroutine#2. This is likely what's happening here: •
ConflatedBroadcastChannel.send
is called for each number, and it never suspends by nature, hence all numbers are sent at once (1, 2, 3 are lost due to conflation). • Your consumer block suspends until
0
is received, then only receive
4
because other numbers have been overwritten.
Note: for your use case you would rather use
launch
instead of
async
, since there are no results to be awaited. Because you have launched those coroutines in the scope of
runBlocking
(or
runBlockingTest
), you don't have to use
awaitAll
(ou
joinAll
when using
launch
) to wait for both coroutines to complete. This is a benefit from Structured Concurrency.
a

Andrey Stepankov

04/06/2020, 3:31 PM
Thank you for explaining. I know difference between runBlockingTest and runBlocking. I'd like to collect all items from send. I can just add yield() after each send. So. Is item disappearance is expected?
k

Kroppeb

04/06/2020, 5:17 PM
Item disappearance is expected. That's what the
Conflated
in
ConflatedBroadcastChannel
stands for
☝️ 1
a

Andrey Stepankov

04/06/2020, 7:54 PM
That's awesome, thanks!
2 Views