https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

dimsuz

06/28/2020, 10:58 PM
Hi! Please help me understand why this hangs after printing 1,2,3 and how do I unfreeze it?...
Copy code
runBlocking {
    val events = BroadcastChannel<Int>(capacity = Channel.BUFFERED)
    merge(events.asFlow(), flowOf(1,2,3))
      .onStart { events.offer(42) }
      .take(4)
      .collect {
        println("received $it")
      }
  }
This popped up while I was trying to write a unit test for a certain api and got me puzzled. It looks like event should be buffered in a non-blocking way. I also use
.take(4)
to unsubscribe from hot Flow, but it's that
42
never gets seen somehow.
b

bezrukov

06/28/2020, 11:04 PM
Because onStart is called before subscribing to flow. So you offered an element to channel before subscribing to it. Buffered broadcast channel doesn`t replay last element, so 42 is just lost
👍🏾 2
d

dimsuz

06/28/2020, 11:12 PM
thanks!
I managed to reproduce my problem more closely. This prints only "received 1" and never receives
42
, hangs. It seems that
scan
is somehow involved:
Copy code
runBlocking {
    val events = BroadcastChannel<Int>(capacity = Channel.BUFFERED)
    events.asFlow()
      .scan(1, { accumulator, value -> value })
      .take(2)
      .collect {
        println("received $it")
        if (it == 1) {
          events.offer(42)
        }
      }
  }
5 Views