hey folKs. we have a cute little situation. how ca...
# announcements
c
hey folKs. we have a cute little situation. how can we restart a Coroutine when an exception is thrown, given that we use a SupervisorJob? Below is an example, with a producer/consumer sharing a Channel. When the consumer receives an exception, it dies (gracefully), and nobody consumes messages anymore. Any tips on how we could fix this, so that messages can be consumed when exception is encountered? Appreciate any help.
Copy code
fun main() {
    val sender = Sender(GlobalScope.coroutineContext)
    GlobalScope.launch {
        sender.send("Message1") // this is processed
        sender.send("Message2") // this is not processed anymore :(
    }
    Thread.sleep(2000)
}

class Sender(
    coroutineContext: CoroutineContext
) {

    private val channel = Channel<String>(32768)
    private val supervisor = SupervisorJob(coroutineContext[Job])
    private val scope = CoroutineScope(coroutineContext + supervisor)

    init {

        val handler = CoroutineExceptionHandler { _, e ->
            println(e.message)
        }

        with(scope) {
            launch(handler) {
                MyProcessor(channel).run() // <--- how can we restart/recreate it when exception is thrown?
            }
        }
    }

    suspend fun send(message: String) {
        channel.send(message)
    }
}

class MyProcessor(
    private val channel: Channel<String>
) {

    suspend fun run() {
        for (message in channel) {
            process(message)
        }
    }

    private suspend fun process(message: String) {
        // something that occasionally throws Exception
        println("Processing $message")
        throw RuntimeException("Failed to process $message")
    }
}
e
The easiest way would just be to throw a try/catch inside
MyProcessor.run()
c
Thanks, Evan. That’s we thought too, but what’s the value of the SupervisorScope in this case? Looking over the docs it even says: “Another example is a server process that spawns several children jobs and needs to supervise their execution, tracking their failures and restarting just those children jobs that had failed.” https://kotlinlang.org/docs/reference/coroutines/exception-handling.html
e
The value of a supervisorscope is that it doesn’t cancel sibling jobs if one of its launched coroutines exits with an exception. You’d have to manually implement the logic for restarting a child job, which would probably involve including the data being worked on in the thrown exception and launching a new job in the supervisorScope using that data via a coroutineExceptionHandler. That’s generally more useful in the case of fan-out concurrency where lots of similar data is being worked on at the same time. Since you’re passing data around in channels though it would make more sense to just make the channel consumer resilient to thrown exceptions so you don’t have to dynamically inject a new receiver into your sender class.