nickheitz
08/15/2019, 11:59 AMimport kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.broadcast
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.coroutines.CoroutineContext
class BroadcastChannelTest : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = <http://Dispatchers.IO|Dispatchers.IO> + Job()
init {
System.setProperty("kotlinx.coroutines.debug", "")
}
@Test
fun broadcast() {
val jobs = mutableListOf<Job>()
val channel = Channel<Int>().broadcast(5, CoroutineStart.DEFAULT)
runBlocking {
jobs.add(createConsumer(channel))
jobs.add(createConsumer(channel, 500, 3))
delay(250)
(0..10).forEach {
channel.send(it)
delay(200)
}
channel.send(-1)
jobs.forEach {
it.join()
}
}
}
private fun createConsumer(channel: BroadcastChannel<Int>, delay: Int = 0, failOn: Int = 11): Job {
return launch {
try {
if (delay > 0) delay(delay.toLong())
println("creating consumer $delay")
val sub = channel.openSubscription()
for (msg in sub) {
if (isActive) {
println("$msg on ${Thread.currentThread().name}")
if (msg == -1) break
if (msg == failOn) {
sub.cancel()
throw Exception("Failing now...")
}
} else {
println("No longer active.")
}
}
} finally {
println("Consumer done")
}
}
}
}
in short, I want one of the channels to error out after 3 messages. GIven that all of these coroutines are (I think) running in the CoroutineScope of the class itself, the Job() at the top should lead to cooperative cancellation on an exception. However, the non erroring listener continues to completion. Any ideas?kingsley
08/15/2019, 12:08 PMcoroutineContext
wrong, which might be a source of the issue
You should have a:
private val job = Job()
// and then
override val coroutineContext: CoroutineContext
get() = <http://Dispatchers.IO|Dispatchers.IO> + job
// OR
override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + Job()
<http://Dispatchers.IO|Dispatchers.IO> + Job()
is being passed down each timenickheitz
08/15/2019, 7:54 PM