https://kotlinlang.org logo
#coroutines
Title
# coroutines
c

CLOVIS

04/12/2020, 2:37 PM
What would the idiomatic way to set a
CompletableDeferred
as a mirror of another one? As in, I have two or more `CompletableDeferred`that I know are waiting for the same value, so I'd like to ‘merge' them so that completing the ‘master' automatically completes the ‘slaves'. So far I've come up with:
Copy code
fun <T> CoroutineScope.merge(
  master: CompletableDeferred<T>,
  slave: CompletableDeferred<T>
) = launch {
  master.await().also { result ->
    slave.complete(result)
  }
}
However this creates a new coroutine, and I'm wondering if there's a better way to achieve a similar goal.
d

Dominaezzz

04/12/2020, 2:48 PM
Looks a bit like XY problem. But I think what you want is
master.invokeOnCompletion { slave.complete(it.getCompleted()) }
. Wouldn't recommend doing this though.
c

CLOVIS

04/12/2020, 3:01 PM
Yeah, that looks better. Why wouldn't you recommend it?
e

Erik

04/12/2020, 3:03 PM
What's the original use case to have multiple completable deferreds that are working to obtain the same value? Why can't you reuse the same single completable deferred everywhere you expect that same value?
☝🏼 1
c

CLOVIS

04/12/2020, 3:06 PM
@Erik I'm working on a cache implementation with structured concurrency, where clients request an object with an ID and by giving a
CompletableDeferred
so the cache can give them back their answer. If two requests for the same object are done before the first one has resulted, the cache merges the Deferred so the completion of the first one also completes the second one.
I'm somewhat new to structured concurrency (I've watched the talks and read the doc, but it's my first real project using it), so if there's something that can be improved, please tell me ^^
d

Dominaezzz

04/12/2020, 3:08 PM
Yes, do you have code you don't mind sharing, so it can be edited?
c

CLOVIS

04/12/2020, 3:08 PM
Yep, give me a minute ^^
Here's the code. Sorry if it's not very clear... The WildFyre class is my lifecycle (implements
CoroutineScope
) https://gitlab.com/clovis-ai/lib/snippets/1965736
d

Dominaezzz

04/12/2020, 3:17 PM
Yeah, this can be simplified. One sec.
Copy code
class Cache<I, T>(
    private val wildfyre: WildFyre,
    private val worker: suspend (id: I) -> T
) {
    private val scope: CoroutineScope = wildfyre
    private val cache = HashMap<I, Deferred<T>>()
    private val cacheSemaphore = Semaphore(1)

    suspend fun get(id: I): T {
        val result = cacheSemaphore.withPermit {
            cache.getOrPut(id) { scope.async { worker(id) } }
        }
        return result.await()
    }

    suspend fun flagAsOutdated(id: I) {
        cacheSemaphore.withPermit {
            val job = cache.remove(id)
            // Should probably cancel this job.
        }
    }
}
e

Erik

04/12/2020, 3:38 PM
I had something similar in my head, but I saw you were answering already, so I awaited this.
d

Dominaezzz

04/12/2020, 3:39 PM
Ha! Just like the
get
method.
🤣 1
e

Erik

04/12/2020, 3:40 PM
I haven't looked at the original piece(s) of code, but I'd like to add that a cache could not only have a cache being a
Map<I, Deferred<T>>
, but also a cache of the actual values. E.g. and in-memory cache could be a private
MutableMap<I, T>
.
c

CLOVIS

04/12/2020, 3:41 PM
That is simplified, yes. Correct me if I'm wrong, but but in this case if a first call to
get
is not in the cache (and therefore the
worker
is called), I think another call to
get
until the
worker
is done will have to wait for the first one to terminate?
My objective was that the cache would be able to reply/delegate other requests while the previous one is still being executed by the worker
d

Dominaezzz

04/12/2020, 3:42 PM
Yes?
Oh. It will have to wait for the first one for each id.
c

CLOVIS

04/12/2020, 3:44 PM
Basically something like: Time 1: Request to get 1, sent to the worker Time 2: Request to get 2, it's in the cache, replies immediately Time 3: The worker has replied, the new value is added to the cache and the first request is done
d

Dominaezzz

04/12/2020, 3:44 PM
Yes, the code I posted does that.
It either gets an ongoing/completed deferred from the cache, or creates one.
c

CLOVIS

04/12/2020, 3:46 PM
I'm not sure I understand how it works... Does that mean that the
async
in the
semaphore.withPermit
unlocks the semaphore while the worker works, and locks it again when it's done?
d

Dominaezzz

04/12/2020, 3:48 PM
Oh the
async
starts the request concurrently. So the
withPermit
exits while the request is running.
Although, I just noticed a tiny problem.
worker
is being called concurrently, as supposed to sequentially. This can be fixed with another
Semaphore
.
c

CLOVIS

04/12/2020, 3:51 PM
So what happens is; Time 1: Request to get 1, the worker starts to work on it and the semaphore is unlocked Time 2: Request to get 2, it's in the cache, the caller gets an already completed Deferred, so the await() doesn't suspend Time 3: Request to get 1,
getOrPut
gives the on-going Deferred created in time 1 Time 4: The worker terminates, both requests that were awaiting for 1 complete Is that how it works?
worker
calls Ktor's request, so there's no issue if it's called concurrently 🙂
d

Dominaezzz

04/12/2020, 3:52 PM
Exactly!
c

CLOVIS

04/12/2020, 3:52 PM
Thanks! That's exactly what I wanted 🙂
e

Erik

04/12/2020, 3:53 PM
I repeat: I haven't looked at the original code precisely, but this is something that could work:
Copy code
class Cache<I, T> {
    private val cache = mutableMapOf<I, T>()
    private val deferreds = mutableMapOf<I, Deferred<T>>()

    suspend fun getById(id: I) = cache.getOrPut(id) {
        val value = deferreds.getOrPut(id) {
            coroutineScope {
                async { TODO("Asynchronously obtain the value by ID") }
            }
        }.await()
        deferreds.remove(id)
        value
    }
}
I think this would work too for multiple request coming in: they just get an existing cached value, or if not, at most 1
Deferred<T>
is started and all requests await that single deferred
No semaphore is the upside, but it depends on what you want or like 😉
d

Dominaezzz

04/12/2020, 4:00 PM
Time 1: get(1), misses cache Time 2: time 1 starts request Time 3: get(1), misses cache Time 4: time 1 finishes and removes request Time 5: time 3, starts request (no cache hit) I think the lock is required.
e

Erik

04/12/2020, 4:07 PM
Yeah, the
deferreds.remove(id)
is a bit buggy here, but it's not required. It's only necessary to not keep completed deferreds in memory. So that line can be removed entirely
👍🏼 1
c

CLOVIS

04/12/2020, 4:08 PM
Do you recommend that the Cache use the main
CoroutineScope
, or have its own child of it?
e

Erik

04/12/2020, 4:08 PM
Then your time 5 example would get the value either from the cache, or if it just misses the cache, then it gets it from the deferred in memory
d

Dominaezzz

04/12/2020, 4:09 PM
It should create a child of it. A supervised one too.
👍 1
c

CLOVIS

04/12/2020, 4:14 PM
So something like
Cache(...): CoroutineScope by CoroutineScope(SupervisorJob(myMainJob))
?
d

Dominaezzz

04/12/2020, 4:16 PM
CoroutineScope
s should be kept private, but otherwise, that ctor looks fine.
14 Views