Constantin Muraru
06/29/2020, 2:54 PMfun main() {
val sender = Sender(GlobalScope.coroutineContext)
GlobalScope.launch {
sender.send("Message1") // this is processed
sender.send("Message2") // this is not processed anymore :(
}
Thread.sleep(2000)
}
class Sender(
coroutineContext: CoroutineContext
) {
private val channel = Channel<String>(32768)
private val supervisor = SupervisorJob(coroutineContext[Job])
private val scope = CoroutineScope(coroutineContext + supervisor)
init {
val handler = CoroutineExceptionHandler { _, e ->
println(e.message)
}
with(scope) {
launch(handler) {
MyProcessor(channel).run() // <--- how can we restart/recreate it when exception is thrown?
}
}
}
suspend fun send(message: String) {
channel.send(message)
}
}
class MyProcessor(
private val channel: Channel<String>
) {
suspend fun run() {
for (message in channel) {
process(message)
}
}
private suspend fun process(message: String) {
// something that occasionally throws Exception
println("Processing $message")
throw RuntimeException("Failed to process $message")
}
}
Evan R.
06/29/2020, 3:11 PMMyProcessor.run()
Constantin Muraru
06/29/2020, 3:20 PMEvan R.
06/30/2020, 11:08 AM