Hi all. I need guidance with coroutines and cancel...
# coroutines
d
Hi all. I need guidance with coroutines and cancellation. I need to maintain a concurrent map of jobs indexed by strings. I need to be able to manually cancel the a job from its key but also remove the job when cancelled because of a failure or normal completion. For now, I am doing the following, but is there a better idiom?
Copy code
private class JobMap<K> {
    private val jobs = atomic(mapOf<K, Job>())

    fun add(key: K, factory: () -> Job) {
        jobs.update {
            val job = factory()
            job.invokeOnCompletion { cause ->
                if (cause == null || cause !is CancellationException) jobs.update { it - key }
            }
            it + (key to job)
        }
    }

    fun remove(key: K) {
        jobs.getAndUpdate { it - key }[key]?.cancel()
    }
}
e
launching jobs definitely isn't appropriate inside
AtomicRef.update
, it is a retry-loop around
compareAndSet
d
Indeed, I had looked at the source... Do you have any suggestion?
@ephemient OK, new try:
Copy code
private class JobMap<K> {
    private val jobs = atomic(mapOf<K, JobContainer>())

    fun add(key: K, factory: () -> Job) {
        val container = JobContainer(factory)
        val putContainer = jobs.updateAndGet { it + (key to container) }[key]
        if (container == putContainer) {
            container.start()
            container.invokeOnCompletion { cause ->
                if (cause == null || cause !is CancellationException) jobs.update { it - key }
            }
        }
    }

    fun remove(key: K) {
        jobs.getAndUpdate { it - key }[key]?.cancel()
    }
}

private class JobContainer(private val factory: () -> Job) {
    private lateinit var job: Job

    fun start() {
        job = factory()
    }

    fun cancel() {
        job.cancel()
    }

    fun invokeOnCompletion(handler: CompletionHandler) {
        job.invokeOnCompletion(handler)
    }
}
Can you tell me if you know of a more idiomatic way to do this?
e
if you're on the JVM, I'd just use
ConcurrentMap
.
Copy code
class JobMap<K : Any> {
    private val jobs = ConcurrentHashMap<K, Job>()

    fun add(key: K, factory: () -> Job) {
        val job = factory()
        jobs.put(key, job)?.cancel()
        job.invokeOnCompletion { cause ->
            if (cause == null || cause !is CancellationException) {
                jobs.remove(key, job)
            }
        }
    }

    fun remove(key: K) {
        jobs.remove(key)?.cancel()
    }
}
d
Unfortunately, I am not!