I have a pretty complex flow of some jobs I need t...
# coroutines
z
I have a pretty complex flow of some jobs I need to run. I need to ensure proper cancellation and cleanup. I’m almost there - but I have some weird logic involving the two child jobs that need to run in parallel
Copy code
val actor = scope.actor<StartNewJobCommand> {
    var fatherJob: Job? = null
    for (startNewJobCommand in channel) {
        // cancel old father AND both his children if they are still running
        fatherJob?.cancelAndJoin() // wait to start new job until old job is complete
        fatherJob = launch {
            val son1 = async {
                // .. 
                // completion of this Deferred should cancel fatherJob and son2's job
            }
            val son2 = async {
                // ..
                // completion of this Deferred should cancel fatherJob and son1's job 
            }
            // fatherJob should not complete until his two sons have finished their work
            // note - only one of the two children need to finish
            son1.await()
            son2.await()
        }
    }
}
is there a way that I could await for one of the two
sonX
jobs to complete, then cancel the other?
d
You could use a
select
.
z
I’ve never actually used
select
before. It seems like it would solve my use cause, but I’m pretty unfamiliar with it
d
Copy code
select<...> {
    son1.onAwait { it }
    son2.onAwait { it }
}
...
being the return type of `son1.await()`/`son2.await()`.
z
“The caller is suspended until one of the clauses is either selected or _fails_”
it just waits for either one to emit, then the
select { }
completes?
d
Yeah. After which you can cancel the parent.
z
this is perfect, thank you
d
No problem! 🙂
z
full code for testing this behavior is:
Copy code
@Test fun test() = runBlocking<Unit> {
    fun <T : Job> T.logCompletion(name: String) = apply {
        invokeOnCompletion {
            if (it == null) {
                println("$name: complete")
            } else {
                println("$name: cancelled by: ${it.message}")
            }
        }
    }
    val scope = CoroutineScope(Dispatchers.Default + Job())
    val actor = scope.actor<StartNewJobCommand> {
        var fatherJob: Job? = null
        for (startNewJobCommand in channel) {
            // cancel old father and both his children if they are still running
            fatherJob?.cancelAndJoin() // wait to start new job until old job is complete
            fatherJob = launch {
                val son1 = async {
                    delay(200)
                    // ..
                    // completion of this deferred should cancel fatherJob and son2's job
                }
                    .logCompletion("son1")
                val son2 = async {
                    delay(100)
                    // ..
                    // completion of this deferred should cancel fatherJob and son1's job
                }
                    .logCompletion("son2")
                select<Unit> {
                    son1.onAwait {
                        println("select clause for son1 emitted")
                        son2.cancel(Exception("son1 completing"))
                    }
                    son2.onAwait {
                        println("select clause for son2 emitted")
                        son1.cancel(Exception("son2 completing"))
                    }
                }
            }
            fatherJob.logCompletion("fatherJob")
        }
    }
    runBlocking {
        actor.send(StartNewJobCommand)
        actor.send(StartNewJobCommand)
        delay(1000L)
    }
}