ansman
02/07/2019, 6:12 PMval pendingJobs = Collections.synchronizedMap(mutableMapOf<String, Deferred<Stuff>>())
suspend fun getStuff(key: String): Stuff = pendingJobs.computeIfAbsent(key, ::computeStuff)
suspend fun computeStuff(key): Stuff = /* Long running operation */
but that doesn’t feel like it’s the Kotlin way.Seri
02/07/2019, 7:01 PMfun main() {
val jobWorker = Channel<CompletableDeferred<String>>(Channel.UNLIMITED)
runBlocking {
// Complete the jobs as they come in
launch {
for (job in jobWorker) {
val delay = doStuff()
job.complete("Job completed in $delay ms")
}
}
// Sender 1 is sending jobs
launch {
for (i in 1..10) {
val job = CompletableDeferred<String>()
jobWorker.send(job)
println("Sender 1: ${job.await()}")
}
}
// Sender 2 is sending jobs
launch {
for (i in 1..10) {
val job = CompletableDeferred<String>()
jobWorker.send(job)
println("Sender 2: ${job.await()}")
}
}
}
}
suspend fun doStuff(): Long {
val delay = Random.nextLong(500, 1500)
delay(delay)
return delay
}
Sender 1: Job completed in 1376 ms
Sender 2: Job completed in 997 ms
Sender 1: Job completed in 515 ms
Sender 2: Job completed in 1200 ms
Sender 1: Job completed in 1216 ms
Sender 2: Job completed in 1003 ms
Sender 1: Job completed in 972 ms
Sender 2: Job completed in 798 ms
Sender 1: Job completed in 800 ms
Sender 2: Job completed in 521 ms
Sender 1: Job completed in 538 ms
Sender 2: Job completed in 1274 ms
Sender 1: Job completed in 1321 ms
Sender 2: Job completed in 1154 ms
Sender 1: Job completed in 955 ms
Sender 2: Job completed in 1182 ms
Sender 1: Job completed in 959 ms
Sender 2: Job completed in 745 ms
Sender 1: Job completed in 1414 ms
Sender 2: Job completed in 1011 ms
ansman
02/07/2019, 7:05 PMDico
02/08/2019, 12:14 AMansman
02/08/2019, 12:15 AMDico
02/08/2019, 11:31 PMcomputeIfAbsent
val pendingJobs = mutableMapOf<String, Deferred<Stuff>>()
val mutex = Mutex()
val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
suspend fun getStuff(key: String): Stuff {
pendingJobs[key]?.let { return it.await() }
mutex.withLock {
pendingJobs[key]?.let { return it.await() }
return scope.async { computeStuff(key) }.also { pendingJobs[key] = it }.await()
}
}
suspend fun computeStuff(key: String): Stuff = TODO()}
Mutex
and avoid locking unless you know the key isn't thereansman
02/11/2019, 4:39 PMgetStuff