CLOVIS
04/12/2020, 2:37 PMCompletableDeferred
as a mirror of another one? As in, I have two or more `CompletableDeferred`that I know are waiting for the same value, so I'd like to ‘merge' them so that completing the ‘master' automatically completes the ‘slaves'.
So far I've come up with:
fun <T> CoroutineScope.merge(
master: CompletableDeferred<T>,
slave: CompletableDeferred<T>
) = launch {
master.await().also { result ->
slave.complete(result)
}
}
However this creates a new coroutine, and I'm wondering if there's a better way to achieve a similar goal.Dominaezzz
04/12/2020, 2:48 PMmaster.invokeOnCompletion { slave.complete(it.getCompleted()) }
. Wouldn't recommend doing this though.CLOVIS
04/12/2020, 3:01 PMErik
04/12/2020, 3:03 PMCLOVIS
04/12/2020, 3:06 PMCompletableDeferred
so the cache can give them back their answer. If two requests for the same object are done before the first one has resulted, the cache merges the Deferred so the completion of the first one also completes the second one.Dominaezzz
04/12/2020, 3:08 PMCLOVIS
04/12/2020, 3:08 PMCoroutineScope
)
https://gitlab.com/clovis-ai/lib/snippets/1965736Dominaezzz
04/12/2020, 3:17 PMclass Cache<I, T>(
private val wildfyre: WildFyre,
private val worker: suspend (id: I) -> T
) {
private val scope: CoroutineScope = wildfyre
private val cache = HashMap<I, Deferred<T>>()
private val cacheSemaphore = Semaphore(1)
suspend fun get(id: I): T {
val result = cacheSemaphore.withPermit {
cache.getOrPut(id) { scope.async { worker(id) } }
}
return result.await()
}
suspend fun flagAsOutdated(id: I) {
cacheSemaphore.withPermit {
val job = cache.remove(id)
// Should probably cancel this job.
}
}
}
Erik
04/12/2020, 3:38 PMDominaezzz
04/12/2020, 3:39 PMget
method.Erik
04/12/2020, 3:40 PMMap<I, Deferred<T>>
, but also a cache of the actual values. E.g. and in-memory cache could be a private MutableMap<I, T>
.CLOVIS
04/12/2020, 3:41 PMget
is not in the cache (and therefore the worker
is called), I think another call to get
until the worker
is done will have to wait for the first one to terminate?Dominaezzz
04/12/2020, 3:42 PMCLOVIS
04/12/2020, 3:44 PMDominaezzz
04/12/2020, 3:44 PMCLOVIS
04/12/2020, 3:46 PMasync
in the semaphore.withPermit
unlocks the semaphore while the worker works, and locks it again when it's done?Dominaezzz
04/12/2020, 3:48 PMasync
starts the request concurrently. So the withPermit
exits while the request is running.worker
is being called concurrently, as supposed to sequentially. This can be fixed with another Semaphore
.CLOVIS
04/12/2020, 3:51 PMgetOrPut
gives the on-going Deferred created in time 1
Time 4: The worker terminates, both requests that were awaiting for 1 complete
Is that how it works?worker
calls Ktor's request, so there's no issue if it's called concurrently 🙂Dominaezzz
04/12/2020, 3:52 PMCLOVIS
04/12/2020, 3:52 PMErik
04/12/2020, 3:53 PMclass Cache<I, T> {
private val cache = mutableMapOf<I, T>()
private val deferreds = mutableMapOf<I, Deferred<T>>()
suspend fun getById(id: I) = cache.getOrPut(id) {
val value = deferreds.getOrPut(id) {
coroutineScope {
async { TODO("Asynchronously obtain the value by ID") }
}
}.await()
deferreds.remove(id)
value
}
}
I think this would work too for multiple request coming in: they just get an existing cached value, or if not, at most 1 Deferred<T>
is started and all requests await that single deferredDominaezzz
04/12/2020, 4:00 PMErik
04/12/2020, 4:07 PMdeferreds.remove(id)
is a bit buggy here, but it's not required. It's only necessary to not keep completed deferreds in memory. So that line can be removed entirelyCLOVIS
04/12/2020, 4:08 PMCoroutineScope
, or have its own child of it?Erik
04/12/2020, 4:08 PMDominaezzz
04/12/2020, 4:09 PMCLOVIS
04/12/2020, 4:14 PMCache(...): CoroutineScope by CoroutineScope(SupervisorJob(myMainJob))
?Dominaezzz
04/12/2020, 4:16 PMCoroutineScope
s should be kept private, but otherwise, that ctor looks fine.