https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

spierce7

03/15/2019, 7:28 PM
Can anyone explain why this coroutine doesn't just emit values forever? Instead count gets to ~20, and then I stop receiving values on the BroadcastChannel. Either this is a bug, or I'm misunderstanding a core concept of how coroutines cancel.
Copy code
fun test() {
  val channel = BroadcastChannel<Int>(10)
  var job = Job()

  // Emit 2 values every second
  GlobalScope.launch {
    var count = 0

    while (true) {
      delay(500)
      channel.send(count++)
    }
  }

  // Create more and more subscribers continually, until they are eventually cancelled below
  GlobalScope.launch {
    while (true) {
      delay(500)

      val scope = GlobalScope + job
      scope.launch {
        for (event in channel.openSubscription()) {
          println("Event: $event")
        }
      }
    }
  }

  // Cancel the job every 5 seconds
  GlobalScope.launch(Dispatchers.Main) {
    while (true) {
      delay(5000)
      job.cancel()
      job = Job()
      println("Cancel")
    }
  }
}
d

Dico

03/15/2019, 7:37 PM
Thanks for putting the code in the thread. I don't see errors in your code, but you could try rectifying these things: A. The subscriptions you open on the broadcast channel are never closed. B. You're changing the
job
variable asynchronously.
s

spierce7

03/15/2019, 7:40 PM
I had it all running in Dispatchers.Main, but I was seeing even stranger behavior (at most 2 channels would emit events). I need to explicitly unsubscribe from the channel though? The channel isn't automatically closed when I close the context that I'm receiving in?
d

Dico

03/15/2019, 7:41 PM
You are not binding the receive channels to that context anywhere
This is what
consumeEach
did but it will be removed
A channel iterator doesn't guarantee closure
s

SiebelsTim

03/15/2019, 7:43 PM
Copy code
// Create more and more subscribers continually, until they are eventually cancelled below
    GlobalScope.launch {
        while (true) {
            delay(500)

            val scope = GlobalScope + job
            scope.launch {
                val openSubscription = channel.openSubscription()
                try {
                    for (event in openSubscription) {
                        println("Event: $event")
                    }
                } finally {
                    openSubscription.cancel()
                }
            }
        }
    }
That works for me.
d

Dico

03/15/2019, 7:44 PM
It's called
close()
right
s

SiebelsTim

03/15/2019, 7:44 PM
Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers that subscribe for the elements using openSubscription function and unsubscribe using ReceiveChannel.cancel function.
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/
d

Dico

03/15/2019, 7:44 PM
Never mind
s

spierce7

03/15/2019, 7:47 PM
Your suggested fix with the
try / finally
works. This is extremely inconvenient though. Is there a more convenient way to accomplish this?
d

Dico

03/15/2019, 7:55 PM
Use
consumeEach
A receive channel can't be implicitly bound to a coroutine, as there are use cases where you receive elements from multiple coroutines.
s

spierce7

03/15/2019, 8:03 PM
you said that consumeEach will be removed. Why will it be removed?
oh, it's going to be removed with cold streams. Awesome. Thanks for the help guys. This fixes our problem
2 Views