I'm to figure out the best way to maintain a sembl...
# coroutines
s
I'm to figure out the best way to maintain a semblance of structured concurrency while using an async cache. Currently what I have is a Caffeine AsyncCache, using which I do roughly this:
Copy code
suspend fun get(key: K): V = supervisorScope {
    asyncCache.get(key) { k: K, _ -> future { loader.load(k) } }.await()
}
The problem I'm having is with cancellation. If multiple consumers are simultaneously waiting for a given key, and one of them stops waiting (i.e. is cancelled), the current implementation will have all the consumers receive a cancellation exception. I can change that by loading the key in a separate scope (e.g. a dedicated CoroutineScope belonging to the cache, rather than a scope belonging to one of the consumers) and replacing
await()
with
asDeferred().await()
as suggested by the docs. But then I have a problem where nobody can cancel the job. Even if all the consumers stop waiting, the cache entry will continue loading. To handle that problem I would need to introduce some lifecycle management into the cache, e.g. make it
Closeable
and have it clean up its own coroutines on close. Has anybody encountered this before and found a nice solution?
n
You could leverage a
MutableSharedFlow
to track those waiting for the result at any time. Something like:
Copy code
fun <T> shareWhileInvoked(scope: CoroutineScope, block: suspend () -> T) : suspend () -> T {
    val flow = MutableSharedFlow<T>(replay = 1)
    scope.launch {
        flow.subscriptionCount
            .map { it > 0 }
            .distinctUntilChanged()
            .collectLatest {
                if (it) {
                    flow.emit(block())
                    cancel()
                }
            }
    }
    
    return { flow.first() }
}
This is similar to
shareIn
with
WhileSubscribed
except that it stops once it's gotten a value. Requires a
CoroutineScope
for the cache.
s
Thanks for the suggestion! I had come up with some ideas about reference counting but I didn't think of using a
MutableSharedFlow
. I like it 👍
n
Something else to figure out is how you want to deal with exceptions.