What would the idiomatic way to set a `Completable...
# coroutines
c
What would the idiomatic way to set a
CompletableDeferred
as a mirror of another one? As in, I have two or more `CompletableDeferred`that I know are waiting for the same value, so I'd like to ‘merge' them so that completing the ‘master' automatically completes the ‘slaves'. So far I've come up with:
Copy code
fun <T> CoroutineScope.merge(
  master: CompletableDeferred<T>,
  slave: CompletableDeferred<T>
) = launch {
  master.await().also { result ->
    slave.complete(result)
  }
}
However this creates a new coroutine, and I'm wondering if there's a better way to achieve a similar goal.
d
Looks a bit like XY problem. But I think what you want is
master.invokeOnCompletion { slave.complete(it.getCompleted()) }
. Wouldn't recommend doing this though.
c
Yeah, that looks better. Why wouldn't you recommend it?
e
What's the original use case to have multiple completable deferreds that are working to obtain the same value? Why can't you reuse the same single completable deferred everywhere you expect that same value?
☝🏼 1
c
@Erik I'm working on a cache implementation with structured concurrency, where clients request an object with an ID and by giving a
CompletableDeferred
so the cache can give them back their answer. If two requests for the same object are done before the first one has resulted, the cache merges the Deferred so the completion of the first one also completes the second one.
I'm somewhat new to structured concurrency (I've watched the talks and read the doc, but it's my first real project using it), so if there's something that can be improved, please tell me ^^
d
Yes, do you have code you don't mind sharing, so it can be edited?
c
Yep, give me a minute ^^
Here's the code. Sorry if it's not very clear... The WildFyre class is my lifecycle (implements
CoroutineScope
) https://gitlab.com/clovis-ai/lib/snippets/1965736
d
Yeah, this can be simplified. One sec.
Copy code
class Cache<I, T>(
    private val wildfyre: WildFyre,
    private val worker: suspend (id: I) -> T
) {
    private val scope: CoroutineScope = wildfyre
    private val cache = HashMap<I, Deferred<T>>()
    private val cacheSemaphore = Semaphore(1)

    suspend fun get(id: I): T {
        val result = cacheSemaphore.withPermit {
            cache.getOrPut(id) { scope.async { worker(id) } }
        }
        return result.await()
    }

    suspend fun flagAsOutdated(id: I) {
        cacheSemaphore.withPermit {
            val job = cache.remove(id)
            // Should probably cancel this job.
        }
    }
}
e
I had something similar in my head, but I saw you were answering already, so I awaited this.
d
Ha! Just like the
get
method.
🤣 1
e
I haven't looked at the original piece(s) of code, but I'd like to add that a cache could not only have a cache being a
Map<I, Deferred<T>>
, but also a cache of the actual values. E.g. and in-memory cache could be a private
MutableMap<I, T>
.
c
That is simplified, yes. Correct me if I'm wrong, but but in this case if a first call to
get
is not in the cache (and therefore the
worker
is called), I think another call to
get
until the
worker
is done will have to wait for the first one to terminate?
My objective was that the cache would be able to reply/delegate other requests while the previous one is still being executed by the worker
d
Yes?
Oh. It will have to wait for the first one for each id.
c
Basically something like: Time 1: Request to get 1, sent to the worker Time 2: Request to get 2, it's in the cache, replies immediately Time 3: The worker has replied, the new value is added to the cache and the first request is done
d
Yes, the code I posted does that.
It either gets an ongoing/completed deferred from the cache, or creates one.
c
I'm not sure I understand how it works... Does that mean that the
async
in the
semaphore.withPermit
unlocks the semaphore while the worker works, and locks it again when it's done?
d
Oh the
async
starts the request concurrently. So the
withPermit
exits while the request is running.
Although, I just noticed a tiny problem.
worker
is being called concurrently, as supposed to sequentially. This can be fixed with another
Semaphore
.
c
So what happens is; Time 1: Request to get 1, the worker starts to work on it and the semaphore is unlocked Time 2: Request to get 2, it's in the cache, the caller gets an already completed Deferred, so the await() doesn't suspend Time 3: Request to get 1,
getOrPut
gives the on-going Deferred created in time 1 Time 4: The worker terminates, both requests that were awaiting for 1 complete Is that how it works?
worker
calls Ktor's request, so there's no issue if it's called concurrently 🙂
d
Exactly!
c
Thanks! That's exactly what I wanted 🙂
e
I repeat: I haven't looked at the original code precisely, but this is something that could work:
Copy code
class Cache<I, T> {
    private val cache = mutableMapOf<I, T>()
    private val deferreds = mutableMapOf<I, Deferred<T>>()

    suspend fun getById(id: I) = cache.getOrPut(id) {
        val value = deferreds.getOrPut(id) {
            coroutineScope {
                async { TODO("Asynchronously obtain the value by ID") }
            }
        }.await()
        deferreds.remove(id)
        value
    }
}
I think this would work too for multiple request coming in: they just get an existing cached value, or if not, at most 1
Deferred<T>
is started and all requests await that single deferred
No semaphore is the upside, but it depends on what you want or like 😉
d
Time 1: get(1), misses cache Time 2: time 1 starts request Time 3: get(1), misses cache Time 4: time 1 finishes and removes request Time 5: time 3, starts request (no cache hit) I think the lock is required.
e
Yeah, the
deferreds.remove(id)
is a bit buggy here, but it's not required. It's only necessary to not keep completed deferreds in memory. So that line can be removed entirely
👍🏼 1
c
Do you recommend that the Cache use the main
CoroutineScope
, or have its own child of it?
e
Then your time 5 example would get the value either from the cache, or if it just misses the cache, then it gets it from the deferred in memory
d
It should create a child of it. A supervised one too.
👍 1
c
So something like
Cache(...): CoroutineScope by CoroutineScope(SupervisorJob(myMainJob))
?
d
CoroutineScope
s should be kept private, but otherwise, that ctor looks fine.