How would you write a suspend function to share th...
# coroutines
d
How would you write a suspend function to share the result of a function between parallel requests? Let me explain with pseudo-code
Copy code
private var shared: Deferred<Result>? = null

private suspend fun actuallyGrabResult(): Result { ... }

suspend fun grabResult(): Result {
  val deferred = shared.takeUnless( !shared.isActive }
    ?: async { actuallyGrabResult() }
        .also { shared = it }
  return deferred.await()
}
The idea is that only 1
actuallyGrabResult()
coroutine at a given time should be running and any request to
grabResult()
should wait for it and get the result. Until the result is delivered, next requests will cause another call. With RxJava you'd do something like:
Copy code
private val shared: Single<Result> = actuallyGrabResult()
        .toObservable()
        .replay(1)
        .refCount()
        .singleOrError()

private fun actuallyGrabResult(): Single<Result> { ... }

fun grabResult(): Single<Result> {
  return shared
}
Is there an idiomatic way of doing this with coroutine? (I've only recently started using coroutines)
z
See https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#lazily-started-async
Copy code
private val shared = scope.async(start = CoroutineStart.LAZY) { actuallyGrabResult() }

suspend fun grabResult(): Result = shared.await()
Where
scope
is a
CoroutineScope
owned by your class and tied to its lifetime. This works great if your class has its own lifetime (i.e. has some kind of
dispose()
method) and thus is its own logical “scope”. If your class doesn’t have its own lifetime, then it’s trickier because you probably need to do some kind of ref counting of the callers of
grabResult()
to correctly scope your internal coroutine(s), both for cancellation and error handling. This is what libraries like https://github.com/dropbox/Store help with, because that logic can get complicated.
s
Yup, calling
await()
on a
Deferred
will not restart the coroutine of the
async
that built it.
d
yeah the problem with that is that it will only run the first time it is executed, while what i want is for it to run exactly 1 job at a given time, so if 3 request come while it is running they'll receive the same result (error or result) and after that if someone call it again it should launch the job/coroutine again
with RxJava was fairly easy to achieve (see example above)
s
Maybe something like this (didn't try the code snippet below out, but the idea is there), where
getResult()
is a suspend fun as well.
Copy code
var sharedResult: Result? = null
val muted = Mutex()
...
suspend fun getSharedResult(): Result {
    sharedResult = null 
    return mutex.withLock {
        if (sharedResult != null) sharedResult
        else getResult().also { sharedResult =it }
    }
}
A call to
getSharedResult()
is made, causing
getResult()
to be called. If 3 new calls are coming before
getResult()
resumes/returns, the first caller to
getSharedResult()
will set
sharedResult
, which the following three return immediately. The 4th caller, who'll arrive way later, will set
sharedResult
to null again and
getResult()
is called for the 2nd time.
z
Once shareIn comes out, I think you can do something similar to your rx solution with flows.
d
Mutex Is interesting, is it Coroutines aware (the lock doesn't actually lock the thread)? I was thinking of a different way.
Copy code
var shared: Deferred<Result> = buildAsync()

private fun buildAsync(): Deferred<Result> {
  return runBlocking {
    async(SupervisedJob, start = Lazy) {
       try {
          getResultInternal()
       } finally {
          shared = buildAsync()
       }
    }
  }
}

suspend fun getResult() : Result {
  return shared.await()
}
Would this work? I'll try tomorrow at work.
z
The mutex from the coroutines library is coroutines aware 🙂
Your
buildAsync
isn’t async, since it uses
runBlocking
, it won’t return until
getResultInternal()
returns. And since
runBlocking
won’t return until all its child coroutines have finished, i think there’s no point in starting the
async
coroutine lazily, since
runBlocking
will immediately join on it and cause it to start. It also recurses infinitely though, since the coroutine won’t finish until the recursive call finishes, which won’t finish until the next call finishes, etc – there’s no base case. And this code isn’t thread safe, since access to
shared
isn’t guarded with a lock.
I don’t think there’s a way to do this imperatively in coroutines today without using either a lock of some kind or a lock-free, CAS-based approach.
d
Right, silly me, I wrote this in my mobile phone. First thing i would do after noticing would be replacing that with someContext.async. Tomorrow I'll experiment a bit, using mutex too. And maybe have a look at Store to see how they did it.
From my tests this works as I want it to work:
Copy code
private val scope = parentScope + SupervisorJob() + Dispatchers.Unconfined

private var shared: Deferred<Result> = buildDeferred()

override suspend fun getResult(): Result {
    return shared.await()
}

private fun buildDeferred(): Deferred<Result> {
    return scope.async(start = CoroutineStart.LAZY) {
        try {
            getResultInternal()
        } finally {
            shared = buildDeferred()
        }
    }
}

private suspend fun buildResultInternal(): Result {
    // network call or whatever
}
the only missing part is canceling the job if every
await()
is canceled but I think i can make it by adding a little bit of code + mutex on the
getResult()
function
I've also checked Dropbox Store 4, they do it differently but they need more stuff and they use Flow under the hood and channels.
@Zach Klippenstein (he/him) [MOD] @streetsofboston thanks by the way!
👍 1