Sam
05/31/2022, 3:25 PMsuspend fun get(key: K): V = supervisorScope {
asyncCache.get(key) { k: K, _ -> future { loader.load(k) } }.await()
}
The problem I'm having is with cancellation. If multiple consumers are simultaneously waiting for a given key, and one of them stops waiting (i.e. is cancelled), the current implementation will have all the consumers receive a cancellation exception.
I can change that by loading the key in a separate scope (e.g. a dedicated CoroutineScope belonging to the cache, rather than a scope belonging to one of the consumers) and replacing await()
with asDeferred().await()
as suggested by the docs. But then I have a problem where nobody can cancel the job. Even if all the consumers stop waiting, the cache entry will continue loading. To handle that problem I would need to introduce some lifecycle management into the cache, e.g. make it Closeable
and have it clean up its own coroutines on close.
Has anybody encountered this before and found a nice solution?Nick Allen
06/01/2022, 6:08 AMMutableSharedFlow
to track those waiting for the result at any time. Something like:
fun <T> shareWhileInvoked(scope: CoroutineScope, block: suspend () -> T) : suspend () -> T {
val flow = MutableSharedFlow<T>(replay = 1)
scope.launch {
flow.subscriptionCount
.map { it > 0 }
.distinctUntilChanged()
.collectLatest {
if (it) {
flow.emit(block())
cancel()
}
}
}
return { flow.first() }
}
This is similar to shareIn
with WhileSubscribed
except that it stops once it's gotten a value. Requires a CoroutineScope
for the cache.Sam
06/01/2022, 12:09 PMMutableSharedFlow
. I like it 👍Nick Allen
06/01/2022, 4:40 PM