What’s the idiomatic way to share a job between mu...
# coroutines
a
What’s the idiomatic way to share a job between multiple callers? I’ve done this now:
Copy code
val pendingJobs = Collections.synchronizedMap(mutableMapOf<String, Deferred<Stuff>>())

suspend fun getStuff(key: String): Stuff = pendingJobs.computeIfAbsent(key, ::computeStuff)
suspend fun computeStuff(key): Stuff = /* Long running operation */
but that doesn’t feel like it’s the Kotlin way.
s
You could use channels and write something like this:
Copy code
fun main() {
    val jobWorker = Channel<CompletableDeferred<String>>(Channel.UNLIMITED)

    runBlocking {
        // Complete the jobs as they come in
        launch {
            for (job in jobWorker) {
                val delay = doStuff()
                job.complete("Job completed in $delay ms")
            }
        }

        // Sender 1 is sending jobs
        launch {
            for (i in 1..10) {
                val job = CompletableDeferred<String>()
                jobWorker.send(job)
                println("Sender 1: ${job.await()}")
            }
        }

        // Sender 2 is sending jobs
        launch {
            for (i in 1..10) {
                val job = CompletableDeferred<String>()
                jobWorker.send(job)
                println("Sender 2: ${job.await()}")
            }
        }
    }
}

suspend fun doStuff(): Long {
    val delay = Random.nextLong(500, 1500)
    delay(delay)
    return delay
}
Copy code
Sender 1: Job completed in 1376 ms
Sender 2: Job completed in 997 ms
Sender 1: Job completed in 515 ms
Sender 2: Job completed in 1200 ms
Sender 1: Job completed in 1216 ms
Sender 2: Job completed in 1003 ms
Sender 1: Job completed in 972 ms
Sender 2: Job completed in 798 ms
Sender 1: Job completed in 800 ms
Sender 2: Job completed in 521 ms
Sender 1: Job completed in 538 ms
Sender 2: Job completed in 1274 ms
Sender 1: Job completed in 1321 ms
Sender 2: Job completed in 1154 ms
Sender 1: Job completed in 955 ms
Sender 2: Job completed in 1182 ms
Sender 1: Job completed in 959 ms
Sender 2: Job completed in 745 ms
Sender 1: Job completed in 1414 ms
Sender 2: Job completed in 1011 ms
a
Your code just has a single worker performing tasks sequentially. What I need is to processes jobs concurrently but to also share jobs for the same key. I’m decoding large files and if multiple callers want the same file they should share the same job
d
You can have a single worker accessing the map, responding to requests its listening for on the channel.
a
Well I want to perform the requests concurrently if possible
d
right, so the idea is that you have multiple threads or coroutines sending requests to the single worker for information, and the single worker then manages the resources without sharing those resources with other threads. The multithreading work/dangers are offloaded to the implementation of the channel.
Additionally, I don't believe you can use a suspend function within the lambda of
computeIfAbsent
as I believe it is not inlined
I think you can get better performance with less work by doing something like this thing:
Copy code
val pendingJobs = mutableMapOf<String, Deferred<Stuff>>()
val mutex = Mutex()
val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())

suspend fun getStuff(key: String): Stuff {
    pendingJobs[key]?.let { return it.await() }
    mutex.withLock {
        pendingJobs[key]?.let { return it.await() }
        return scope.async { computeStuff(key) }.also { pendingJobs[key] = it }.await()
    }
}

suspend fun computeStuff(key: String): Stuff = TODO()}
basically, just use a
Mutex
and avoid locking unless you know the key isn't there
a
I agree that using a mutex would be better but that code is not thread safe since you read without synchronizing which can make reads and writes overlap. To be safe you’d need to remove the first line in
getStuff