https://kotlinlang.org logo
Title
n

nickheitz

08/15/2019, 11:59 AM
Hi all. I had a play around with the ArrayBroadcastChannel today, and I'm not understanding something about cooperative cancellation.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.broadcast
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.coroutines.CoroutineContext

class BroadcastChannelTest : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = <http://Dispatchers.IO|Dispatchers.IO> + Job()

    init {
        System.setProperty("kotlinx.coroutines.debug", "")
    }

    @Test
    fun broadcast() {
        val jobs = mutableListOf<Job>()
        val channel = Channel<Int>().broadcast(5, CoroutineStart.DEFAULT)
        runBlocking {
            jobs.add(createConsumer(channel))
            jobs.add(createConsumer(channel, 500, 3))
            delay(250)

            (0..10).forEach {
                channel.send(it)
                delay(200)
            }

            channel.send(-1)

            jobs.forEach {
                it.join()
            }
        }

    }

    private fun createConsumer(channel: BroadcastChannel<Int>, delay: Int = 0, failOn: Int = 11): Job {
        return launch {
            try {
                if (delay > 0) delay(delay.toLong())

                println("creating consumer $delay")
                val sub = channel.openSubscription()
                for (msg in sub) {
                    if (isActive) {
                        println("$msg on ${Thread.currentThread().name}")

                        if (msg == -1) break

                        if (msg == failOn) {
                            sub.cancel()
                            throw Exception("Failing now...")
                        }
                    } else {
                        println("No longer active.")
                    }
                }
            } finally {
                println("Consumer done")
            }
        }
    }
}
in short, I want one of the channels to error out after 3 messages. GIven that all of these coroutines are (I think) running in the CoroutineScope of the class itself, the Job() at the top should lead to cooperative cancellation on an exception. However, the non erroring listener continues to completion. Any ideas?
k

kingsley

08/15/2019, 12:08 PM
Quickly glancing over this. It appears that you are setting up the
coroutineContext
wrong, which might be a source of the issue You should have a:
private val job = Job()

// and then
override val coroutineContext: CoroutineContext
  get() = <http://Dispatchers.IO|Dispatchers.IO> + job

// OR
override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + Job()
Basically, as it is now, any coroutine you launch will be started in the context of a new parent job, since a new
<http://Dispatchers.IO|Dispatchers.IO> + Job()
is being passed down each time
n

nickheitz

08/15/2019, 7:54 PM
That makes perfect sense. I obviously misunderstood an example somewhere demonstrating how to extend CoroutineScope. Thanks.
👍 1