domfox
10/25/2023, 11:52 AMConcurrentHashMap
? The idea is that an update to the map via compute
might be a suspending function, and the map should lock out the hash bucket being updated until that function has completed. I have this naive version which locks out the entire map:
class CoroutineMap<K : Any, V : Any> {
private val data: MutableMap<K, V> = mutableMapOf()
private val dataLock = Mutex()
suspend fun compute(key: K, update: suspend (V?) -> V?): V? = dataLock.withLock {
val updated = update(data[key])
if (updated == null) data.remove(key) else data[key] = updated
updated
}
}
but a version with per-bucket mutexes would be better, I believe?bezrukov
10/25/2023, 12:23 PMclass Cell<V: Any>(var value: V? = null) {
val lock = Mutex()
fun update(update: suspend (V?) -> V?): V?): V? {
lock.withLock { value = update(value)
}
}
class CoroutineMap<K : Any, V : Any> {
private val map = ConcurrentHashMap<K, Cell<V>>()
suspend fun compute(key: K, update: suspend (V?) -> V?): V? =
map.getOrPut(key, ::Cell).update(update)
}
}
In this case it is lock per keybezrukov
10/25/2023, 12:26 PMval map = ConcurrentHashMap<K, MutableStateFlow<V?>>()
suspend fun compute(key: K, update: suspend (V?) -> V?): V? {
val stateFlow = map.getOrPut(key) { MutableStateFlow<V?>(null) }
stateFlow.update(update)
}
domfox
10/25/2023, 12:28 PMKlitos Kyriacou
10/25/2023, 12:29 PMdomfox
10/25/2023, 12:29 PMdomfox
10/25/2023, 12:29 PM