Alex Vasilkov
02/14/2021, 7:25 AMMutex
, but it is very slow, if you’ll run the examples from https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html you’ll notice that Mutex
is like 100 times slower than atomics.
I found a way to use MutableStateFlow<Boolean>
for suspending calls synchronization and it seems to be 5-10 times faster than Mutex
, I’m not sure if there are any hidden gotchas with this approach though (see more in the thread).private val lock = MutableStateFlow(false)
suspend fun criticalSection() = lock.withLock { … }
private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
while (true) {
if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
first { !it } // Suspending until lock is released
}
try {
return action()
} finally {
value = false // Releasing the lock
}
}
package example
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.system.measureTimeMillis
import kotlin.test.Test
class Test {
private val dispatcher = Dispatchers.Default
@Test
fun testMutex() = runBlocking(dispatcher) {
val mutex = Mutex()
var counter = 0
massiveRun {
mutex.withLock { counter++ }
}
println("Counter (mutex) = $counter")
}
@Test
fun testFlow() = runBlocking(dispatcher) {
val lock = MutableStateFlow(false)
var counter = 0
massiveRun {
lock.withLock { counter++ }
}
println("Counter (flow) = $counter")
}
@Test
fun testSemaphore() = runBlocking(dispatcher) {
val semaphore = Semaphore(1)
var counter = 0
massiveRun {
semaphore.withPermit { counter++ }
}
println("Counter (semaphore) = $counter")
}
@Test
fun testLock() = runBlocking(dispatcher) {
val lock = ReentrantLock()
var counter = 0
massiveRun {
lock.withLock { counter++ }
}
println("Counter (lock) = $counter")
}
@Test
fun testAtomic() = runBlocking(dispatcher) {
val counter = AtomicInteger(0)
massiveRun {
counter.incrementAndGet()
}
println("Counter (atomic) = ${counter.get()}")
}
private suspend fun massiveRun(n: Int = 100, k: Int = 1000, action: suspend () -> Unit) {
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
while (true) {
if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
first { !it } // Suspending until lock is released
}
try {
return action()
} finally {
value = false // Releasing the lock
}
}
}
Completed 100000 actions in 816 ms
Counter (mutex) = 100000
Completed 100000 actions in 134 ms
Counter (flow) = 100000
Completed 100000 actions in 17 ms
Counter (lock) = 100000
Completed 100000 actions in 782 ms
Counter (semaphore) = 100000
Completed 100000 actions in 21 ms
Counter (atomic) = 100000
irus
02/14/2021, 7:50 AM@Test
fun testMutex() = runBlocking(dispatcher) {
val mutex = Mutex()
var counter = 0
massiveRun {
while(true) {
if (mutex.tryLock()) {
counter++
mutex.unlock()
break
}
}
}
println("Counter (mutex) = $counter")
}
Completed 100000 actions in 61 ms
Counter (mutex) = 100000
Completed 100000 actions in 512 ms
Counter (flow) = 100000
Completed 100000 actions in 43 ms
Counter (lock) = 100000
Completed 100000 actions in 27 ms
Counter (atomic) = 100000
Alex Vasilkov
02/14/2021, 7:58 AMwhile
loop until lock is released, effectively wasting CPU cycles. If you’ll use delay(10L)
instead of counter++
you will have a lot of unnecessary loop cycles.irus
02/14/2021, 8:03 AMAlex Vasilkov
02/14/2021, 8:07 AMirus
02/14/2021, 8:10 AMprivate suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureNanoTime {
coroutineScope { // scope for coroutines
repeat(n) { one ->
launch(dispatcher) {
repeat(k) {
delay(one.toLong())
action()
}
}
}
}
}
println("Completed ${n * k} actions in ${Duration.ofNanos(time).toMillis()} ms")
}
Now mutex on par with others:
Completed 100000 actions in 99587 ms
Counter (mutex) = 100000
Completed 100000 actions in 99650 ms
Counter (flow) = 100000
Completed 100000 actions in 99611 ms
Counter (lock) = 100000
Completed 100000 actions in 99549 ms
Counter (atomic) = 100000Alex Vasilkov
02/14/2021, 11:18 AMwhile
loop for potentially long time.
I’m not sure if CPU graph measurements are any reliable. Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry.
The last example also looks wrong, basically at one=99
you are scheduling a very simple computation (counter++
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic.irus
02/14/2021, 12:55 PMI can hardly agree that Mutex.tryLock should be used in real lifeIt depends on actual task you trying to solve, I can see how it can be useful in some scenarios. And ss well as mutex shouldn't be used for accessing something simple like counter. I even probably will use channel (I actually use a lot of them) for processing some shared state.
I’m not sure if CPU graph measurements are any reliable.They are
Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry.tryLock running faster and less consume cpu for benchmark use-case
The last example also looks wrong, basically atOf course, but original benchmark also doesn't answer how real application would behave, because problem you see – high contention of multiple coroutines on single suspension point, and in real application you may not see ityou are scheduling a very simple computation (one=99
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic. (edited)counter++
Dominaezzz
02/14/2021, 12:55 PMSemaphore
?Alex Vasilkov
02/14/2021, 1:03 PMSemaphore
suggestion, I didn’t know it can be used with suspend functions. But it turns out it’s performance is on par with Mutex
(just a bit faster), but still noticeably slower that MutableStateFlow
(I updated the test class and results above)louiscad
02/14/2021, 1:10 PMwithLock
for the Mutex
case?Alex Vasilkov
02/14/2021, 1:10 PMSemaphore with permits = 1 is essentially a Mutex.
)Dominaezzz
02/14/2021, 1:10 PMI didn’t know it can be used with suspend functions.Just to make sure, I'm talking about the kotlinx.coroutines one, not the java one.
irus
02/14/2021, 1:50 PMlouiscad
02/14/2021, 2:11 PMtryLock
instead of withLock
, right? Or maybe the latter uses the former and it's the same (on mobile, hard to check right now)irus
02/14/2021, 2:22 PMZach Klippenstein (he/him) [MOD]
02/14/2021, 4:14 PMAlex Vasilkov
02/14/2021, 5:17 PMirus
02/15/2021, 9:27 AMwithLock
goes through slow path where lock-free linked list created (), with callback to resume and cancel coroutine. Extra objects and linked list makes entire thing slower, but not slow for real application.
This can be prove by the same test, but dispatcher that works on single thread: Executors.newFixedThreadPool(1).asCoroutineDispatcher()
Results (best of 10 runs):
Completed 100000 actions in 4 ms
Counter (mutex) = 1000000
Completed 100000 actions in 16 ms
Counter (flow) = 1000000
Completed 100000 actions in 4 ms
Counter (lock) = 1000000
Completed 100000 actions in 10 ms
Counter (channel) = 1000000
Completed 100000 actions in 2 ms
Counter (atomic) = 1000000
I'm not so aware of Flow, so can't say why it faster for high contention case, but definitely will look inside implementation to check this outAlex Vasilkov
02/15/2021, 9:49 AMirus
02/15/2021, 9:50 AMand they are competing for the lock againNo of course, first from linked list is taken for this case.
Alex Vasilkov
02/15/2021, 10:02 AMirus
02/15/2021, 10:09 AM@Test
fun testMutex() = runBlocking(dispatcher) {
val mutex = Mutex()
var counter = 0
massiveRun {
mutex.withLock {
yield()
counter++
}
}
println("Counter (mutex) = $counter")
}
@Test
fun testFlow() = runBlocking(dispatcher) {
val lock = MutableStateFlow(false)
var counter = 0
massiveRun {
lock.withLock {
yield()
counter++
}
}
println("Counter (flow) = $counter")
}
@Test
fun testLock() = runBlocking(dispatcher) {
val lock = ReentrantLock()
var counter = 0
massiveRun {
lock.lock()
try {
yield()
counter++
} finally {
lock.unlock()
}
}
println("Counter (lock) = $counter")
}
Completed 100000 actions in 1449 ms
Counter (mutex) = 1000000
Completed 100000 actions in 5166 ms
Counter (flow) = 1000000
Completed 100000 actions in 172 ms
Counter (atomic) = 1000000
Alex Vasilkov
02/15/2021, 10:25 AMyield()
drives StateFlow lock crazy, that’s for sure. Another observation is that if you’ll use delay(1L)
instead of yield()
the difference between Mutex and StateFlow becomes very insignificant. irus
02/15/2021, 11:09 AMwasyl
02/15/2021, 11:11 AMirus
02/15/2021, 11:12 AMAlex Vasilkov
02/15/2021, 12:24 PMThread.currentThread().name
) and got strange results.
When using `Dispatchers.Default`:
Completed 100000 actions in 966 ms
Counter (mutex) = 100000 / used 8 threads and 582 coroutines
Completed 100000 actions in 235 ms
Counter (flow) = 100000 / used 16 threads and 1557 coroutines
When using `newFixedThreadPoolContext(16, "test")`:
Completed 100000 actions in 947 ms
Counter (mutex) = 100000 / used 16 threads and 1585 coroutines
Completed 100000 actions in 100 ms
Counter (flow) = 100000 / used 16 threads and 1260 coroutines
In first case Mutex used significantly less threads / coroutines but in the second case it used even more resources than StateFlow.
Also, I tracked how many times while(true)
loop were running in StateFlow.withLock
method (across all 100_000 executions) and got about 110_000 for fixed thread pool and about 180_000 for Default dispatcher. In my understanding it means that there wasn’t too many unnecessary coroutine resumes.irus
02/15/2021, 12:26 PM