Daniele Segato
09/18/2020, 4:24 PMprivate var shared: Deferred<Result>? = null
private suspend fun actuallyGrabResult(): Result { ... }
suspend fun grabResult(): Result {
val deferred = shared.takeUnless( !shared.isActive }
?: async { actuallyGrabResult() }
.also { shared = it }
return deferred.await()
}
The idea is that only 1 actuallyGrabResult()
coroutine at a given time should be running and any request to grabResult()
should wait for it and get the result. Until the result is delivered, next requests will cause another call.
With RxJava you'd do something like:
private val shared: Single<Result> = actuallyGrabResult()
.toObservable()
.replay(1)
.refCount()
.singleOrError()
private fun actuallyGrabResult(): Single<Result> { ... }
fun grabResult(): Single<Result> {
return shared
}
Is there an idiomatic way of doing this with coroutine?
(I've only recently started using coroutines)Zach Klippenstein (he/him) [MOD]
09/18/2020, 4:32 PMprivate val shared = scope.async(start = CoroutineStart.LAZY) { actuallyGrabResult() }
suspend fun grabResult(): Result = shared.await()
Where scope
is a CoroutineScope
owned by your class and tied to its lifetime.
This works great if your class has its own lifetime (i.e. has some kind of dispose()
method) and thus is its own logical “scope”.
If your class doesn’t have its own lifetime, then it’s trickier because you probably need to do some kind of ref counting of the callers of grabResult()
to correctly scope your internal coroutine(s), both for cancellation and error handling. This is what libraries like https://github.com/dropbox/Store help with, because that logic can get complicated.streetsofboston
09/18/2020, 4:34 PMawait()
on a Deferred
will not restart the coroutine of the async
that built it.Daniele Segato
09/20/2020, 9:00 AMstreetsofboston
09/20/2020, 2:11 PMgetResult()
is a suspend fun as well.
var sharedResult: Result? = null
val muted = Mutex()
...
suspend fun getSharedResult(): Result {
sharedResult = null
return mutex.withLock {
if (sharedResult != null) sharedResult
else getResult().also { sharedResult =it }
}
}
A call to getSharedResult()
is made, causing getResult()
to be called.
If 3 new calls are coming before getResult()
resumes/returns, the first caller to getSharedResult()
will set sharedResult
, which the following three return immediately.
The 4th caller, who'll arrive way later, will set sharedResult
to null again and getResult()
is called for the 2nd time.Zach Klippenstein (he/him) [MOD]
09/20/2020, 3:36 PMDaniele Segato
09/20/2020, 6:58 PMvar shared: Deferred<Result> = buildAsync()
private fun buildAsync(): Deferred<Result> {
return runBlocking {
async(SupervisedJob, start = Lazy) {
try {
getResultInternal()
} finally {
shared = buildAsync()
}
}
}
}
suspend fun getResult() : Result {
return shared.await()
}
Would this work? I'll try tomorrow at work.Zach Klippenstein (he/him) [MOD]
09/20/2020, 7:00 PMbuildAsync
isn’t async, since it uses runBlocking
, it won’t return until getResultInternal()
returns. And since runBlocking
won’t return until all its child coroutines have finished, i think there’s no point in starting the async
coroutine lazily, since runBlocking
will immediately join on it and cause it to start. It also recurses infinitely though, since the coroutine won’t finish until the recursive call finishes, which won’t finish until the next call finishes, etc – there’s no base case. And this code isn’t thread safe, since access to shared
isn’t guarded with a lock.Daniele Segato
09/20/2020, 10:12 PMprivate val scope = parentScope + SupervisorJob() + Dispatchers.Unconfined
private var shared: Deferred<Result> = buildDeferred()
override suspend fun getResult(): Result {
return shared.await()
}
private fun buildDeferred(): Deferred<Result> {
return scope.async(start = CoroutineStart.LAZY) {
try {
getResultInternal()
} finally {
shared = buildDeferred()
}
}
}
private suspend fun buildResultInternal(): Result {
// network call or whatever
}
the only missing part is canceling the job if every await()
is canceled but I think i can make it by adding a little bit of code + mutex on the getResult()
function