Rob
08/10/2021, 8:32 PMlouiscad
08/10/2021, 8:33 PMsuspend fun testCoroutinesRacing() {
val result = raceOf({
delay(3)
"slow"
}, {
delay(0)
"fast"
})
assertEquals(expected = "fast", actual = result)
}
https://github.com/LouisCAD/Splitties/tree/main/modules/coroutines#racing-coroutinesephemient
08/10/2021, 8:35 PMlouiscad
08/10/2021, 8:43 PMRob
08/10/2021, 8:53 PMBrian Dilley
08/10/2021, 9:07 PMlouiscad
08/10/2021, 9:12 PMawaitCancellation()
doesn't suit your needs, then I guess you are looking for something different. If you come up with something, please share.Rob
08/10/2021, 9:13 PMRichard Gomez
08/10/2021, 9:17 PMJavier
08/10/2021, 9:20 PMlouiscad
08/10/2021, 9:21 PMraceOf
or race
in regards to ensuring structured concurrency, but I'm biased.Rob
08/10/2021, 9:25 PMawaitAll()
. So it's matches their naming scheme. Thanks for the feedback.Nick Allen
08/10/2021, 9:27 PMmerge(::method1.asFlow(), ::method2.asFlow()).first()
Little more awkward if you want the lambdas:
suspend fun testCoroutinesRacing() {
val result = merge(suspend {
delay(3)
"slow"
}.asFlow(), suspend {
delay(0)
"fast"
}.asFlow()).first()
assertEquals(expected = "fast", actual = result)
}
Rob
08/10/2021, 9:31 PMNick Allen
08/10/2021, 9:32 PMFlow
operators all propagate cancellation upstream. They wouldn't really play well with structured concurrency if they didn't.Rob
08/10/2021, 9:39 PMNick Allen
08/10/2021, 9:42 PMfun main() {
runBlocking {
val x = merge<String>(suspend {
try {
delay(3)
"slow"
} catch (e: CancellationException) {
println("Cancelled slow")
throw e
}
}.asFlow(), suspend {
delay(0)
"fast"
}.asFlow()).first()
println(x)
}
}
prints
Cancelled slow
fast
myDeferred::await.asFlow()
? That would cancel the await call, but not myDeferred
.Rob
08/10/2021, 9:46 PMNick Allen
08/10/2021, 9:51 PMmerge
and Flow
.awaitFirst
implementation earlier in thread that you suggested. If you want cancellation, use Flow
or use the the Splitties lib.Rob
08/10/2021, 10:15 PMDeferred<T>
shouldn't be in the signature at all. Oh well, at least now I know what was happening.Deferred<T>
will cancel the scope even if I try to catch
the await()
of it so trapping those errors will not work.louiscad
08/10/2021, 11:04 PMRob
08/10/2021, 11:05 PMNick Allen
08/10/2021, 11:20 PMmerge(::method1.asFlow().catch {}, ::method2.asFlow().catch{}).firstOrNull()
That is if you can work with suspend methods instead of Deferred
. You seem to want to use Deferred
but it's not clear to me if you actually need to.Rob
08/10/2021, 11:24 PMawaitAll()
. I think first()
works for my case. I do want to get an error if ALL the coroutines fail.Nick Allen
08/10/2021, 11:41 PMDeferred
objects can be modified to just cancel everything once it has a value:
suspend fun <T> awaitFirstIgnoringErrors(vararg deferreds: Deferred<T>): T = deferreds
.map { it::await.asFlow().catch {} }
.merge()
.first()
.also { _ -> deferreds.forEach { it.cancel() }}
Rob
08/10/2021, 11:48 PMNick Allen
08/10/2021, 11:49 PMRob
08/10/2021, 11:50 PMNick Allen
08/10/2021, 11:57 PMawaitFirstIgnoringErrors
is not throwing the exception. runBlocking
is throwing the exception because a child failed. You have to wrap the code in supervisorScope
.Rob
08/11/2021, 12:01 AMawait()
on an async
block exceptions can be lost. I thought I could catch async
block errors, but they still cancel the scope when await()
is called.Nick Allen
08/11/2021, 12:06 AMawait
.
This will fail:
runBlocking { async { throw Exception() }}
Failed child jobs fail the parent. This is part of structured concurrency. The exception is if you use a SupervisorJob
which does not mind if child jobs fail (in this case failed child exceptions are handled like top level coroutine exceptions).Rob
08/11/2021, 12:12 AMasync
blocks cancelled their parent if they failed even if you didn't call await
. I thought I read that await
had to be called. Everything makes sense now 😄Nick Allen
08/11/2021, 12:17 AMDeferred.await
or Channel.receive
(where you are asking for a result) will throw an exception ... since that was the result (it failed).ephemient
08/11/2021, 12:18 AMRob
08/11/2021, 12:24 AMephemient
08/11/2021, 12:37 AMrunBlocking { supervisorScope { async { throw Exception() } } }
eygraber
08/11/2021, 4:22 AMsuspend inline fun awaitFirst(vararg funcs: suspend () -> Unit) {
supervisorScope {
select<Unit> {
funcs.forEach { func -> launch { func() }.onJoin {} }
}
coroutineContext.cancelChildren()
}
}