Rob
08/10/2021, 8:32 PMlouiscad
08/10/2021, 8:33 PMsuspend fun testCoroutinesRacing() {
val result = raceOf({
delay(3)
"slow"
}, {
delay(0)
"fast"
})
assertEquals(expected = "fast", actual = result)
}
https://github.com/LouisCAD/Splitties/tree/main/modules/coroutines#racing-coroutinesephemient
08/10/2021, 8:35 PMlouiscad
08/10/2021, 8:43 PMRob
08/10/2021, 8:53 PMBrian Dilley
08/10/2021, 9:07 PMlouiscad
08/10/2021, 9:12 PMawaitCancellation() doesn't suit your needs, then I guess you are looking for something different. If you come up with something, please share.louiscad
08/10/2021, 9:13 PMRob
08/10/2021, 9:13 PMRob
08/10/2021, 9:16 PMRichard Gomez
08/10/2021, 9:17 PMJavier
08/10/2021, 9:20 PMlouiscad
08/10/2021, 9:21 PMraceOf or race in regards to ensuring structured concurrency, but I'm biased.Rob
08/10/2021, 9:25 PMawaitAll() . So it's matches their naming scheme. Thanks for the feedback.Nick Allen
08/10/2021, 9:27 PMmerge(::method1.asFlow(), ::method2.asFlow()).first()
Little more awkward if you want the lambdas:
suspend fun testCoroutinesRacing() {
val result = merge(suspend {
delay(3)
"slow"
}.asFlow(), suspend {
delay(0)
"fast"
}.asFlow()).first()
assertEquals(expected = "fast", actual = result)
}Rob
08/10/2021, 9:31 PMNick Allen
08/10/2021, 9:32 PMNick Allen
08/10/2021, 9:38 PMFlow operators all propagate cancellation upstream. They wouldn't really play well with structured concurrency if they didn't.Rob
08/10/2021, 9:39 PMRob
08/10/2021, 9:40 PMNick Allen
08/10/2021, 9:42 PMfun main() {
runBlocking {
val x = merge<String>(suspend {
try {
delay(3)
"slow"
} catch (e: CancellationException) {
println("Cancelled slow")
throw e
}
}.asFlow(), suspend {
delay(0)
"fast"
}.asFlow()).first()
println(x)
}
}
prints
Cancelled slow
fastNick Allen
08/10/2021, 9:43 PMmyDeferred::await.asFlow() ? That would cancel the await call, but not myDeferred.Rob
08/10/2021, 9:46 PMNick Allen
08/10/2021, 9:51 PMmerge and Flow.Nick Allen
08/10/2021, 9:57 PMawaitFirst implementation earlier in thread that you suggested. If you want cancellation, use Flow or use the the Splitties lib.Rob
08/10/2021, 10:15 PMRob
08/10/2021, 10:17 PMDeferred<T> shouldn't be in the signature at all. Oh well, at least now I know what was happening.Rob
08/10/2021, 10:42 PMDeferred<T> will cancel the scope even if I try to catch the await() of it so trapping those errors will not work.Rob
08/10/2021, 11:00 PMlouiscad
08/10/2021, 11:04 PMRob
08/10/2021, 11:05 PMRob
08/10/2021, 11:07 PMRob
08/10/2021, 11:08 PMRob
08/10/2021, 11:10 PMNick Allen
08/10/2021, 11:20 PMmerge(::method1.asFlow().catch {}, ::method2.asFlow().catch{}).firstOrNull()
That is if you can work with suspend methods instead of Deferred . You seem to want to use Deferred but it's not clear to me if you actually need to.Rob
08/10/2021, 11:24 PMawaitAll(). I think first() works for my case. I do want to get an error if ALL the coroutines fail.Nick Allen
08/10/2021, 11:41 PMDeferred objects can be modified to just cancel everything once it has a value:
suspend fun <T> awaitFirstIgnoringErrors(vararg deferreds: Deferred<T>): T = deferreds
.map { it::await.asFlow().catch {} }
.merge()
.first()
.also { _ -> deferreds.forEach { it.cancel() }}Rob
08/10/2021, 11:48 PMNick Allen
08/10/2021, 11:49 PMRob
08/10/2021, 11:50 PMRob
08/10/2021, 11:51 PMRob
08/10/2021, 11:51 PMRob
08/10/2021, 11:53 PMRob
08/10/2021, 11:54 PMNick Allen
08/10/2021, 11:57 PMawaitFirstIgnoringErrors is not throwing the exception. runBlocking is throwing the exception because a child failed. You have to wrap the code in supervisorScope.Rob
08/11/2021, 12:01 AMawait() on an async block exceptions can be lost. I thought I could catch async block errors, but they still cancel the scope when await() is called.Nick Allen
08/11/2021, 12:06 AMawait.
This will fail:
runBlocking { async { throw Exception() }}
Failed child jobs fail the parent. This is part of structured concurrency. The exception is if you use a SupervisorJob which does not mind if child jobs fail (in this case failed child exceptions are handled like top level coroutine exceptions).Rob
08/11/2021, 12:12 AMasync blocks cancelled their parent if they failed even if you didn't call await. I thought I read that await had to be called. Everything makes sense now 😄Rob
08/11/2021, 12:15 AMNick Allen
08/11/2021, 12:17 AMDeferred.await or Channel.receive (where you are asking for a result) will throw an exception ... since that was the result (it failed).ephemient
08/11/2021, 12:18 AMRob
08/11/2021, 12:24 AMephemient
08/11/2021, 12:37 AMrunBlocking { supervisorScope { async { throw Exception() } } }eygraber
08/11/2021, 4:22 AMsuspend inline fun awaitFirst(vararg funcs: suspend () -> Unit) {
supervisorScope {
select<Unit> {
funcs.forEach { func -> launch { func() }.onJoin {} }
}
coroutineContext.cancelChildren()
}
}