How do I run two coroutines concurrently but take ...
# coroutines
r
How do I run two coroutines concurrently but take the result of the first one that returns and cancel the other? Preferably a one liner.
l
I made a library (available on MavenCentral) for that. The implementation is not a one-liner, but usage is quite straightforward I think:
Copy code
suspend fun testCoroutinesRacing() {
    val result = raceOf({
        delay(3)
        "slow"
    }, {
        delay(0)
        "fast"
    })
    assertEquals(expected = "fast", actual = result)
}
https://github.com/LouisCAD/Splitties/tree/main/modules/coroutines#racing-coroutines
👍 1
e
there's a lot of discussion on https://github.com/Kotlin/kotlinx.coroutines/issues/424 as well
l
Finally opened an issue to ask for an API to race coroutines: https://github.com/Kotlin/kotlinx.coroutines/issues/2867
r
Here's my implementation.
b
@louiscad what if i want to provide a predicate to determine the “winner” ?
l
If
awaitCancellation()
doesn't suit your needs, then I guess you are looking for something different. If you come up with something, please share.
@Rob Your snippet is not cancelling the losing coroutines as you asked FYI.
r
ohh
Does this fix it?
j
Arrow has race functions too
l
@Rob I guess. If prefer
raceOf
or
race
in regards to ensuring structured concurrency, but I'm biased.
r
Standard library already has
awaitAll()
. So it's matches their naming scheme. Thanks for the feedback.
n
It's not too hard with `Flow`:
Copy code
merge(::method1.asFlow(), ::method2.asFlow()).first()
Little more awkward if you want the lambdas:
Copy code
suspend fun testCoroutinesRacing() {
    val result = merge(suspend {
        delay(3)
        "slow"
    }.asFlow(), suspend {
        delay(0)
        "fast"
    }.asFlow()).first()
    assertEquals(expected = "fast", actual = result)
}
r
It's still not getting cancelled 🤦
n
Yes it is. first cancels the result of merge which cancels the still active "slow" flow.
The built in
Flow
operators all propagate cancellation upstream. They wouldn't really play well with structured concurrency if they didn't.
r
I'm testing it, and it's not cancelling the slower coroutine.
It may be with how channelFlow is implemented.
n
Copy code
fun main() {
    runBlocking {
        val x = merge<String>(suspend {
            try {
                delay(3)
                "slow"
            } catch (e: CancellationException) {
                println("Cancelled slow")
                throw e
            }
        }.asFlow(), suspend {
            delay(0)
            "fast"
        }.asFlow()).first()
        println(x)
    }
}
prints
Copy code
Cancelled slow
fast
Are you perhaps doing
myDeferred::await.asFlow()
? That would cancel the await call, but not
myDeferred
.
r
This doesn't cancel the first coroutine.
n
Sorry, I thought you were responding to my comments about using
merge
and
Flow
.
It looks like you are using the
awaitFirst
implementation earlier in thread that you suggested. If you want cancellation, use
Flow
or use the the Splitties lib.
r
I finally figured out why it wasn't cancelling before. I was cancelling the job blocking on the deferred but not the deferred itself.
Maybe
Deferred<T>
shouldn't be in the signature at all. Oh well, at least now I know what was happening.
It seems that errors in
Deferred<T>
will cancel the scope even if I try to
catch
the
await()
of it so trapping those errors will not work.
I think this one works. Errors are suppressed. If there isn't a single coroutine that completes with a value it will error. Also no external dependency required.
l
FYI, the code I linked way above can be copy pasted, it's self-contained in a file, it's not a one-liner, but it's not a thousand, nor a hundred of lines either.
r
It had some some things that made it not easily copy pasteable.
Does it prevent errors from bubbling up?
For example if one of the coroutines fails before the one completes successfully, does prevent that error from cancelling the scope?
When I was using `Deferred`s as arguments, I couldn't catch those errors.
n
Seems like maybe you'd be better served with:
Copy code
merge(::method1.asFlow().catch {}, ::method2.asFlow().catch{}).firstOrNull()
That is if you can work with suspend methods instead of
Deferred
. You seem to want to use
Deferred
but it's not clear to me if you actually need to.
r
I was following the precedent set by
awaitAll()
. I think
first()
works for my case. I do want to get an error if ALL the coroutines fail.
n
Keep in mind any solution that does not cancel your
Deferred
objects can be modified to just cancel everything once it has a value:
Copy code
suspend fun <T> awaitFirstIgnoringErrors(vararg deferreds: Deferred<T>): T = deferreds
    .map { it::await.asFlow().catch {} }
    .merge()
    .first()
    .also { _ -> deferreds.forEach { it.cancel() }}
r
That didn't cancel the slow deferred in my test.
n
Try waiting longer than 3ms. If the coroutine manages to finish, it can't be cancelled.
r
Here is my test.
oh, once I upped it to 20ms it worked 😄
Ohh, that was because the runBlocking finished.
There seems to be a lot of play with the timings. I'm using play.kotlinlong.org to test so that maybe why.
This throws an exception in runBlocking.
n
awaitFirstIgnoringErrors
is not throwing the exception.
runBlocking
is throwing the exception because a child failed. You have to wrap the code in
supervisorScope
.
r
Yeah. I read that unless you do
await()
on an
async
block exceptions can be lost. I thought I could catch
async
block errors, but they still cancel the scope when
await()
is called.
n
It has nothing to do with
await
. This will fail:
Copy code
runBlocking { async { throw Exception() }}
Failed child jobs fail the parent. This is part of structured concurrency. The exception is if you use a
SupervisorJob
which does not mind if child jobs fail (in this case failed child exceptions are handled like top level coroutine exceptions).
r
I didn't know that
async
blocks cancelled their parent if they failed even if you didn't call
await
. I thought I read that
await
had to be called. Everything makes sense now 😄
I'm not sure if this is what I saw but they're starting an async block in global scope and catching it in another scope. https://kotlinlang.org/docs/exception-handling.html#exception-propagation
n
GlobalScope works like SupervisorJob (technically it has no job at all). Methods like
Deferred.await
or
Channel.receive
(where you are asking for a result) will throw an exception ... since that was the result (it failed).
e
cancelling GlobalScope if a child failed would sort of amazing :D
r
I guess I thought `Deferred`s were like `Future`s, `Promise`s, `Single`s etc.
e
they are, it's just that failures also cancel everything up to the nearest supervisor (or global) scope. compare
runBlocking { supervisorScope { async { throw Exception() } } }
e
I'm doing it like this. It doesn't return anything, but it could probably be adapted to use `Deferred<T>`:
Copy code
suspend inline fun awaitFirst(vararg funcs: suspend () -> Unit) {
  supervisorScope {
    select<Unit> {
      funcs.forEach { func -> launch { func() }.onJoin {} }
    }

    coroutineContext.cancelChildren()
  }
}