https://kotlinlang.org logo
Title
r

Rob

08/10/2021, 8:32 PM
How do I run two coroutines concurrently but take the result of the first one that returns and cancel the other? Preferably a one liner.
l

louiscad

08/10/2021, 8:33 PM
I made a library (available on MavenCentral) for that. The implementation is not a one-liner, but usage is quite straightforward I think:
suspend fun testCoroutinesRacing() {
    val result = raceOf({
        delay(3)
        "slow"
    }, {
        delay(0)
        "fast"
    })
    assertEquals(expected = "fast", actual = result)
}
https://github.com/LouisCAD/Splitties/tree/main/modules/coroutines#racing-coroutines
👍 1
e

ephemient

08/10/2021, 8:35 PM
there's a lot of discussion on https://github.com/Kotlin/kotlinx.coroutines/issues/424 as well
l

louiscad

08/10/2021, 8:43 PM
Finally opened an issue to ask for an API to race coroutines: https://github.com/Kotlin/kotlinx.coroutines/issues/2867
r

Rob

08/10/2021, 8:53 PM
Here's my implementation.
b

Brian Dilley

08/10/2021, 9:07 PM
@louiscad what if i want to provide a predicate to determine the “winner” ?
l

louiscad

08/10/2021, 9:12 PM
If
awaitCancellation()
doesn't suit your needs, then I guess you are looking for something different. If you come up with something, please share.
@Rob Your snippet is not cancelling the losing coroutines as you asked FYI.
r

Rob

08/10/2021, 9:13 PM
ohh
Does this fix it?
j

Javier

08/10/2021, 9:20 PM
Arrow has race functions too
l

louiscad

08/10/2021, 9:21 PM
@Rob I guess. If prefer
raceOf
or
race
in regards to ensuring structured concurrency, but I'm biased.
r

Rob

08/10/2021, 9:25 PM
Standard library already has
awaitAll()
. So it's matches their naming scheme. Thanks for the feedback.
n

Nick Allen

08/10/2021, 9:27 PM
It's not too hard with `Flow`:
merge(::method1.asFlow(), ::method2.asFlow()).first()
Little more awkward if you want the lambdas:
suspend fun testCoroutinesRacing() {
    val result = merge(suspend {
        delay(3)
        "slow"
    }.asFlow(), suspend {
        delay(0)
        "fast"
    }.asFlow()).first()
    assertEquals(expected = "fast", actual = result)
}
r

Rob

08/10/2021, 9:31 PM
It's still not getting cancelled 🤦
n

Nick Allen

08/10/2021, 9:32 PM
Yes it is. first cancels the result of merge which cancels the still active "slow" flow.
The built in
Flow
operators all propagate cancellation upstream. They wouldn't really play well with structured concurrency if they didn't.
r

Rob

08/10/2021, 9:39 PM
I'm testing it, and it's not cancelling the slower coroutine.
It may be with how channelFlow is implemented.
n

Nick Allen

08/10/2021, 9:42 PM
fun main() {
    runBlocking {
        val x = merge<String>(suspend {
            try {
                delay(3)
                "slow"
            } catch (e: CancellationException) {
                println("Cancelled slow")
                throw e
            }
        }.asFlow(), suspend {
            delay(0)
            "fast"
        }.asFlow()).first()
        println(x)
    }
}
prints
Cancelled slow
fast
Are you perhaps doing
myDeferred::await.asFlow()
? That would cancel the await call, but not
myDeferred
.
r

Rob

08/10/2021, 9:46 PM
This doesn't cancel the first coroutine.
n

Nick Allen

08/10/2021, 9:51 PM
Sorry, I thought you were responding to my comments about using
merge
and
Flow
.
It looks like you are using the
awaitFirst
implementation earlier in thread that you suggested. If you want cancellation, use
Flow
or use the the Splitties lib.
r

Rob

08/10/2021, 10:15 PM
I finally figured out why it wasn't cancelling before. I was cancelling the job blocking on the deferred but not the deferred itself.
Maybe
Deferred<T>
shouldn't be in the signature at all. Oh well, at least now I know what was happening.
It seems that errors in
Deferred<T>
will cancel the scope even if I try to
catch
the
await()
of it so trapping those errors will not work.
I think this one works. Errors are suppressed. If there isn't a single coroutine that completes with a value it will error. Also no external dependency required.
l

louiscad

08/10/2021, 11:04 PM
FYI, the code I linked way above can be copy pasted, it's self-contained in a file, it's not a one-liner, but it's not a thousand, nor a hundred of lines either.
r

Rob

08/10/2021, 11:05 PM
It had some some things that made it not easily copy pasteable.
Does it prevent errors from bubbling up?
For example if one of the coroutines fails before the one completes successfully, does prevent that error from cancelling the scope?
When I was using `Deferred`s as arguments, I couldn't catch those errors.
n

Nick Allen

08/10/2021, 11:20 PM
Seems like maybe you'd be better served with:
merge(::method1.asFlow().catch {}, ::method2.asFlow().catch{}).firstOrNull()
That is if you can work with suspend methods instead of
Deferred
. You seem to want to use
Deferred
but it's not clear to me if you actually need to.
r

Rob

08/10/2021, 11:24 PM
I was following the precedent set by
awaitAll()
. I think
first()
works for my case. I do want to get an error if ALL the coroutines fail.
n

Nick Allen

08/10/2021, 11:41 PM
Keep in mind any solution that does not cancel your
Deferred
objects can be modified to just cancel everything once it has a value:
suspend fun <T> awaitFirstIgnoringErrors(vararg deferreds: Deferred<T>): T = deferreds
    .map { it::await.asFlow().catch {} }
    .merge()
    .first()
    .also { _ -> deferreds.forEach { it.cancel() }}
r

Rob

08/10/2021, 11:48 PM
That didn't cancel the slow deferred in my test.
n

Nick Allen

08/10/2021, 11:49 PM
Try waiting longer than 3ms. If the coroutine manages to finish, it can't be cancelled.
r

Rob

08/10/2021, 11:50 PM
Here is my test.
oh, once I upped it to 20ms it worked 😄
Ohh, that was because the runBlocking finished.
There seems to be a lot of play with the timings. I'm using play.kotlinlong.org to test so that maybe why.
This throws an exception in runBlocking.
n

Nick Allen

08/10/2021, 11:57 PM
awaitFirstIgnoringErrors
is not throwing the exception.
runBlocking
is throwing the exception because a child failed. You have to wrap the code in
supervisorScope
.
r

Rob

08/11/2021, 12:01 AM
Yeah. I read that unless you do
await()
on an
async
block exceptions can be lost. I thought I could catch
async
block errors, but they still cancel the scope when
await()
is called.
n

Nick Allen

08/11/2021, 12:06 AM
It has nothing to do with
await
. This will fail:
runBlocking { async { throw Exception() }}
Failed child jobs fail the parent. This is part of structured concurrency. The exception is if you use a
SupervisorJob
which does not mind if child jobs fail (in this case failed child exceptions are handled like top level coroutine exceptions).
r

Rob

08/11/2021, 12:12 AM
I didn't know that
async
blocks cancelled their parent if they failed even if you didn't call
await
. I thought I read that
await
had to be called. Everything makes sense now 😄
I'm not sure if this is what I saw but they're starting an async block in global scope and catching it in another scope. https://kotlinlang.org/docs/exception-handling.html#exception-propagation
n

Nick Allen

08/11/2021, 12:17 AM
GlobalScope works like SupervisorJob (technically it has no job at all). Methods like
Deferred.await
or
Channel.receive
(where you are asking for a result) will throw an exception ... since that was the result (it failed).
e

ephemient

08/11/2021, 12:18 AM
cancelling GlobalScope if a child failed would sort of amazing :D
r

Rob

08/11/2021, 12:24 AM
I guess I thought `Deferred`s were like `Future`s, `Promise`s, `Single`s etc.
e

ephemient

08/11/2021, 12:37 AM
they are, it's just that failures also cancel everything up to the nearest supervisor (or global) scope. compare
runBlocking { supervisorScope { async { throw Exception() } } }
e

eygraber

08/11/2021, 4:22 AM
I'm doing it like this. It doesn't return anything, but it could probably be adapted to use `Deferred<T>`:
suspend inline fun awaitFirst(vararg funcs: suspend () -> Unit) {
  supervisorScope {
    select<Unit> {
      funcs.forEach { func -> launch { func() }.onJoin {} }
    }

    coroutineContext.cancelChildren()
  }
}