https://kotlinlang.org logo
Title
q

Quantum64

04/07/2022, 2:38 AM
Hi everyone. I'm trying to create a key-based mutex for the JVM. Can anyone tell me if this looks like a correct and safe implementation?
class KeyedMutex<T> {
    @PublishedApi
    internal val mutexes = ConcurrentHashMap<T, Container>()

    data class Container(
        val mutex: Mutex,
        val holders: Int
    )

    suspend inline fun <U> use(key: T, closure: () -> U): U {
        val result = mutexes
            .compute(key) { _, container ->
                container?.copy(holders = container.holders + 1)
                    ?: Container(Mutex(), holders = 1)
            }!! // always associates the key
            .mutex
            .withLock(action = closure)

        mutexes.computeIfPresent(key) { _, container ->
            if (container.holders > 1) container.copy(holders = container.holders - 1)
            else null
        }

        return result
    }
}
:thread-please: 2
i

ildar.i [Android]

04/07/2022, 3:54 AM
maybe Roman Elizarov’s article will help you: https://elizarov.medium.com/phantom-of-the-coroutine-afc63b03a131
q

Quantum64

04/07/2022, 5:02 AM
I'm not sure how that helps or mitigates the need for a striped mutex. I'm asking if my above implementation is correct. What are you referring to exactly?
e

ephemient

04/07/2022, 10:43 AM
how much concurrency do you really need? e.g. Java's ConcurrentHashMap uses a concurrency level of 16 by default, and you could likewise, along the lines of
class KeyedMutex<in T>(concurrencyLevel: Int = 16) {
    init {
        require(concurrencyLevel > 0)
    }

    private val mutexes = Array(concurrencyLevel) { Mutex() }

    @PublishedApi
    internal fun get(key: T): Mutex = mutexes[key.hashCode().mod(mutexes.size)]
}

suspend fun <T, R> KeyedMutex<T>.use(key: T, owner: Any? = null, block: suspend () -> R): R =
    get(key).withLock(owner) { block() }
q

Quantum64

04/08/2022, 1:40 AM
Just because the concurrenthashmap only expects 16 threads to access it at once doesn't mean this keyed mutex can only give out 16 locks. However, with that array implementation, that would be the case. The use case here could be reading and caching from a key/value store. If multiple coroutines try to read the same key at the same time, we wouldn't want to waste time querying the database twice for the same key when the result is going to be cached anyway. Therefore, the first coroutine acquires a mutex for that key, does the database operation, caches the result, then releases the mutex. The second coroutine is then able to acquire the same mutex but it will see the fetched value in the cache, preventing an unnecessary database query. Additionally, my approach only stores mutexes that are actually being used and drops them immediately after rather than constantly storing 16 mutexes.
e

ephemient

04/09/2022, 12:40 AM
1. I suggested that because it's easier to prove correct, not because it's more concurrent. 2. in general I would expect that the contention introduced by the extra locking required to keep track of the waiter counts to be worse as the system scales. of course you want to avoid allocating an unbounded number of mutexes, but there's no need to reason to drop every single one immediately
if you really need the performance, there are high-quality libraries such as https://github.com/ben-manes/caffeine (while it doesn't natively integrate with kotlinx.coroutines, it does support async cache loaders so can be bridged reasonably well)