Can anyone explain why this coroutine doesn't just...
# coroutines
s
Can anyone explain why this coroutine doesn't just emit values forever? Instead count gets to ~20, and then I stop receiving values on the BroadcastChannel. Either this is a bug, or I'm misunderstanding a core concept of how coroutines cancel.
Copy code
fun test() {
  val channel = BroadcastChannel<Int>(10)
  var job = Job()

  // Emit 2 values every second
  GlobalScope.launch {
    var count = 0

    while (true) {
      delay(500)
      channel.send(count++)
    }
  }

  // Create more and more subscribers continually, until they are eventually cancelled below
  GlobalScope.launch {
    while (true) {
      delay(500)

      val scope = GlobalScope + job
      scope.launch {
        for (event in channel.openSubscription()) {
          println("Event: $event")
        }
      }
    }
  }

  // Cancel the job every 5 seconds
  GlobalScope.launch(Dispatchers.Main) {
    while (true) {
      delay(5000)
      job.cancel()
      job = Job()
      println("Cancel")
    }
  }
}
d
Thanks for putting the code in the thread. I don't see errors in your code, but you could try rectifying these things: A. The subscriptions you open on the broadcast channel are never closed. B. You're changing the
job
variable asynchronously.
s
I had it all running in Dispatchers.Main, but I was seeing even stranger behavior (at most 2 channels would emit events). I need to explicitly unsubscribe from the channel though? The channel isn't automatically closed when I close the context that I'm receiving in?
d
You are not binding the receive channels to that context anywhere
This is what
consumeEach
did but it will be removed
A channel iterator doesn't guarantee closure
s
Copy code
// Create more and more subscribers continually, until they are eventually cancelled below
    GlobalScope.launch {
        while (true) {
            delay(500)

            val scope = GlobalScope + job
            scope.launch {
                val openSubscription = channel.openSubscription()
                try {
                    for (event in openSubscription) {
                        println("Event: $event")
                    }
                } finally {
                    openSubscription.cancel()
                }
            }
        }
    }
That works for me.
d
It's called
close()
right
s
Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers that subscribe for the elements using openSubscription function and unsubscribe using ReceiveChannel.cancel function.
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/
d
Never mind
s
Your suggested fix with the
try / finally
works. This is extremely inconvenient though. Is there a more convenient way to accomplish this?
d
Use
consumeEach
A receive channel can't be implicitly bound to a coroutine, as there are use cases where you receive elements from multiple coroutines.
s
you said that consumeEach will be removed. Why will it be removed?
oh, it's going to be removed with cold streams. Awesome. Thanks for the help guys. This fixes our problem