https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Alex Vasilkov

02/14/2021, 7:25 AM
Mutex vs StateFlow (sounds strange, but please read on) Suppose you need to synchronize few suspending calls (e.g. DB reads and DB writes) so that only one action is running at a time. You cannot use java synchronization or reentrant lock as it does not correctly work with suspension (https://blog.danlew.net/2020/01/28/coroutines-and-java-synchronization-dont-mix/). It seems like the only suitable option is to use
Mutex
, but it is very slow, if you’ll run the examples from https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html you’ll notice that
Mutex
is like 100 times slower than atomics. I found a way to use
MutableStateFlow<Boolean>
for suspending calls synchronization and it seems to be 5-10 times faster than
Mutex
, I’m not sure if there are any hidden gotchas with this approach though (see more in the thread).
1
👀 1
The code is quite simple:
Copy code
private val lock = MutableStateFlow(false)

suspend fun criticalSection() = lock.withLock { … }

private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
    while (true) {
        if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
        first { !it } // Suspending until lock is released
    }

    try {
        return action()
    } finally {
        value = false // Releasing the lock
    }
}
Here is full test class comparing Mutex, StateFlow, Semaphore, reentrant lock and atomics:
Copy code
package example

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.system.measureTimeMillis
import kotlin.test.Test

class Test {

    private val dispatcher = Dispatchers.Default

    @Test
    fun testMutex() = runBlocking(dispatcher) {
        val mutex = Mutex()
        var counter = 0

        massiveRun {
            mutex.withLock { counter++ }
        }
        println("Counter (mutex) = $counter")
    }

    @Test
    fun testFlow() = runBlocking(dispatcher) {
        val lock = MutableStateFlow(false)
        var counter = 0

        massiveRun {
            lock.withLock { counter++ }
        }
        println("Counter (flow) = $counter")
    }

    @Test
    fun testSemaphore() = runBlocking(dispatcher) {
        val semaphore = Semaphore(1)
        var counter = 0

        massiveRun {
            semaphore.withPermit { counter++ }
        }
        println("Counter (semaphore) = $counter")
    }

    @Test
    fun testLock() = runBlocking(dispatcher) {
        val lock = ReentrantLock()
        var counter = 0

        massiveRun {
            lock.withLock { counter++ }
        }
        println("Counter (lock) = $counter")
    }

    @Test
    fun testAtomic() = runBlocking(dispatcher) {
        val counter = AtomicInteger(0)

        massiveRun {
            counter.incrementAndGet()
        }
        println("Counter (atomic) = ${counter.get()}")
    }


    private suspend fun massiveRun(n: Int = 100, k: Int = 1000, action: suspend () -> Unit) {
        val time = measureTimeMillis {
            coroutineScope { // scope for coroutines
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }


    private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
        while (true) {
            if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
            first { !it } // Suspending until lock is released
        }

        try {
            return action()
        } finally {
            value = false // Releasing the lock
        }
    }

}
Sample result:
Copy code
Completed 100000 actions in 816 ms
Counter (mutex) = 100000

Completed 100000 actions in 134 ms
Counter (flow) = 100000

Completed 100000 actions in 17 ms
Counter (lock) = 100000

Completed 100000 actions in 782 ms
Counter (semaphore) = 100000

Completed 100000 actions in 21 ms
Counter (atomic) = 100000
Can anybody explain why Mutex (designed for that exact usage) is much slower than StateFlow approach above? Can I use this StateFlow hack or should I prefer Mutex anyway?
i

irus

02/14/2021, 7:50 AM
Copy code
@Test
    fun testMutex() = runBlocking(dispatcher) {
        val mutex = Mutex()
        var counter = 0
        massiveRun {
            while(true) {
                if (mutex.tryLock()) {
                    counter++
                    mutex.unlock()
                    break
                }
            }
        }
        println("Counter (mutex) = $counter")
    }
Copy code
Completed 100000 actions in 61 ms
Counter (mutex) = 100000
Completed 100000 actions in 512 ms
Counter (flow) = 100000
Completed 100000 actions in 43 ms
Counter (lock) = 100000
Completed 100000 actions in 27 ms
Counter (atomic) = 100000
a

Alex Vasilkov

02/14/2021, 7:58 AM
@irus I see your point, but your code does not suspend until the action is finished, instead it just runs
while
loop until lock is released, effectively wasting CPU cycles. If you’ll use
delay(10L)
instead of
counter++
you will have a lot of unnecessary loop cycles.
i

irus

02/14/2021, 8:03 AM
Yes, but you want better number, I gave them 🙂
Try to do at least multiple runs (add repeat to massiveRun) and you'll see that performance of mutex is changes
message has been deleted
a

Alex Vasilkov

02/14/2021, 8:07 AM
I don't just need better numbers in synthetic tests, I need a correctly working code as well 🙂 You have different numbers, but they are still much bigger than for StateFlow, aren't they?
i

irus

02/14/2021, 8:10 AM
message has been deleted
Right, because flow eat much more CPU
mutex, flow, lock, atomic
tryLock mutex, flow, lock, atomic
In synthetic test CPU consumption for while(true) tryLock is less than for flow variant
Another one, I added delay into hottest part of the loop:
Copy code
private suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureNanoTime {
        coroutineScope { // scope for coroutines
            repeat(n) { one ->
                launch(dispatcher) {
                    repeat(k) {
                        delay(one.toLong())
                        action()
                    }
                }
            }
        }
    }
    println("Completed ${n * k} actions in ${Duration.ofNanos(time).toMillis()} ms")
}
Now mutex on par with others: Completed 100000 actions in 99587 ms Counter (mutex) = 100000 Completed 100000 actions in 99650 ms Counter (flow) = 100000 Completed 100000 actions in 99611 ms Counter (lock) = 100000 Completed 100000 actions in 99549 ms Counter (atomic) = 100000
atomic/lock works faster because in this cases you just run a bunch of task on top of thread pool.
mutex/flow slower because coroutines have to suspend/unsuspend to achieve work
a

Alex Vasilkov

02/14/2021, 11:18 AM
Well, I can hardly agree that Mutex.tryLock should be used in real life, it’s just plain wrong to waste entire thread using
while
loop for potentially long time. I’m not sure if CPU graph measurements are any reliable. Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry. The last example also looks wrong, basically at
one=99
you are scheduling a very simple computation (
counter++
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic.
👀 1
i

irus

02/14/2021, 12:55 PM
I can hardly agree that Mutex.tryLock should be used in real life
It depends on actual task you trying to solve, I can see how it can be useful in some scenarios. And ss well as mutex shouldn't be used for accessing something simple like counter. I even probably will use channel (I actually use a lot of them) for processing some shared state.
I’m not sure if CPU graph measurements are any reliable.
They are
Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry.
tryLock running faster and less consume cpu for benchmark use-case
The last example also looks wrong, basically at
one=99
you are scheduling a very simple computation (
counter++
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic. (edited)
Of course, but original benchmark also doesn't answer how real application would behave, because problem you see – high contention of multiple coroutines on single suspension point, and in real application you may not see it
d

Dominaezzz

02/14/2021, 12:55 PM
Have you tried
Semaphore
?
a

Alex Vasilkov

02/14/2021, 1:03 PM
@Dominaezzz Thanks for
Semaphore
suggestion, I didn’t know it can be used with suspend functions. But it turns out it’s performance is on par with
Mutex
(just a bit faster), but still noticeably slower that
MutableStateFlow
(I updated the test class and results above)
l

louiscad

02/14/2021, 1:10 PM
@irus Can you do your benchmark using exclusively
withLock
for the
Mutex
case?
a

Alex Vasilkov

02/14/2021, 1:10 PM
(BTW, from Kotlin docs:
Semaphore with permits = 1 is essentially a Mutex.
)
d

Dominaezzz

02/14/2021, 1:10 PM
I didn’t know it can be used with suspend functions.
Just to make sure, I'm talking about the kotlinx.coroutines one, not the java one.
Ah, nvm
i

irus

02/14/2021, 1:50 PM
@louiscad what do you mean?
l

louiscad

02/14/2021, 2:11 PM
@irus You're using
tryLock
instead of
withLock
, right? Or maybe the latter uses the former and it's the same (on mobile, hard to check right now)
z

Zach Klippenstein (he/him) [MOD]

02/14/2021, 4:14 PM
I’m impressed that your database is so fast that your choice of synchronization primitive even matters.
2
💯 3
☝🏼 2
a

Alex Vasilkov

02/14/2021, 5:17 PM
Haha, that’s totally true 🙂 I’m not really planing to call the DB 1000 times per second from 100 coroutines. In my case it is closer to 1-2 potential calls from a few coroutines, but it has a high chance of being called concurrently.
I guess there can still be other use-cases where Mutex performance can be important though. And it still looks strange that simple StateFlow-based solution significantly outperforms native Mutex implementation when measured at scale. It’s interesting to understand why it happens, probably there is something more about the Mutex.
👍 1
i

irus

02/15/2021, 9:27 AM
Because of high contention you introduced in test,
withLock
goes through slow path where lock-free linked list created (

https://youtu.be/W2dOOBN1OQI?t=882

), with callback to resume and cancel coroutine. Extra objects and linked list makes entire thing slower, but not slow for real application. This can be prove by the same test, but dispatcher that works on single thread:
Executors.newFixedThreadPool(1).asCoroutineDispatcher()
Results (best of 10 runs):
Copy code
Completed 100000 actions in 4 ms
Counter (mutex) = 1000000

Completed 100000 actions in 16 ms
Counter (flow) = 1000000

Completed 100000 actions in 4 ms
Counter (lock) = 1000000

Completed 100000 actions in 10 ms
Counter (channel) = 1000000


Completed 100000 actions in 2 ms
Counter (atomic) = 1000000
I'm not so aware of Flow, so can't say why it faster for high contention case, but definitely will look inside implementation to check this out
a

Alex Vasilkov

02/15/2021, 9:49 AM
Thanks. The reason is definitely with slow path, if using single thread then you eliminate the need of the synchronisation and all tests become equally fast. The test is built to check the actual synchronizatoin logic, not the fast paths. I was trying to look through the code (Mutex and StateFlow) but it is not trivial at all 🙂 My best guess so far is that Mutex has to resume all waiting coroutines each time the Mutex is unlocked, and they are competing for the lock again. With StateFlow not all collectors are guaranteed to receive intermediate values (because StateFlow’s DROP_OLDEST / CONFLATED behaviour) thus not all of them are resumed. I have no idea how close is it to the real reason though.
i

irus

02/15/2021, 9:50 AM
and they are competing for the lock again
No of course, first from linked list is taken for this case.
a

Alex Vasilkov

02/15/2021, 10:02 AM
Hm, indeed, your are right, it should only resume one waiting coroutine at a time
i

irus

02/15/2021, 10:09 AM
One interesting observation: if you add some suspend operation to critical section, mutex becomes much faster than flow, and ReentrantLock deadlocks (this obvious, but kinda funny anyway 🙂)
Copy code
@Test
fun testMutex() = runBlocking(dispatcher) {
    val mutex = Mutex()
    var counter = 0
    massiveRun {
        mutex.withLock {
            yield()
            counter++
        }
    }
    println("Counter (mutex) = $counter")
}

@Test
fun testFlow() = runBlocking(dispatcher) {
    val lock = MutableStateFlow(false)
    var counter = 0
    massiveRun {
        lock.withLock {
            yield()
            counter++
        }
    }
    println("Counter (flow) = $counter")
}

@Test
fun testLock() = runBlocking(dispatcher) {
    val lock = ReentrantLock()
    var counter = 0
    massiveRun {
        lock.lock()
        try {
            yield()
            counter++
        } finally {
            lock.unlock()
        }
    }
    println("Counter (lock) = $counter")
}
Copy code
Completed 100000 actions in 1449 ms
Counter (mutex) = 1000000

Completed 100000 actions in 5166 ms
Counter (flow) = 1000000

Completed 100000 actions in 172 ms
Counter (atomic) = 1000000
a

Alex Vasilkov

02/15/2021, 10:25 AM
Yeah,
yield()
drives StateFlow lock crazy, that’s for sure. Another observation is that if you’ll use
delay(1L)
instead of
yield()
the difference between Mutex and StateFlow becomes very insignificant. There is still quite a big penalty of synchronization, but StateFlow does not behave any better.
i

irus

02/15/2021, 11:09 AM
I checked how MutableStateFlow + custom lock works, and my observation is - it works "faster" because it occupies all available threads. Each emit unsuspend all coroutines waiting for collect, and for some extend it adds "parallelism" to process, but mutex works "sequentially" so it takes more time to process entire queue. I added CPU, left - mutex, right - flow. You can see that flow fill up pool (all running, instead of waiting as for mutex), and because for CAS syncronized is used, it even sometimes blocks some threads. So because of this inefficiency, I'll suggest to use Mutex or Channel
👌 1
w

wasyl

02/15/2021, 11:11 AM
Nice investigation Ruslan 🙂 Btw what’s the software you used to observe the threads here?
i

irus

02/15/2021, 11:12 AM
Thanks, it's yourkit
🙏 1
a

Alex Vasilkov

02/15/2021, 12:24 PM
@irus Thanks for the interesting research! Can you explain the graph, what does yellow / green flames mean? I added coroutines and threads tracking to my test (based on
Thread.currentThread().name
) and got strange results. When using `Dispatchers.Default`:
Copy code
Completed 100000 actions in 966 ms
Counter (mutex) = 100000 / used 8 threads and 582 coroutines

Completed 100000 actions in 235 ms
Counter (flow) = 100000 / used 16 threads and 1557 coroutines
When using `newFixedThreadPoolContext(16, "test")`:
Copy code
Completed 100000 actions in 947 ms
Counter (mutex) = 100000 / used 16 threads and 1585 coroutines

Completed 100000 actions in 100 ms
Counter (flow) = 100000 / used 16 threads and 1260 coroutines
In first case Mutex used significantly less threads / coroutines but in the second case it used even more resources than StateFlow. Also, I tracked how many times
while(true)
loop were running in
StateFlow.withLock
method (across all 100_000 executions) and got about 110_000 for fixed thread pool and about 180_000 for Default dispatcher. In my understanding it means that there wasn’t too many unnecessary coroutine resumes.
i

irus

02/15/2021, 12:26 PM
green - thread in running state yellow - waiting (most probably unsafe.park, waiting for work to be assigned)
Because of such number of coroutines trying to access single point (flow/mutex) it's probably not like "we just registered 100000 subscribes, and start to unsuspend them", it's more likely continuous process, and till very last cycles new coroutines will subscribe on flow. That's why adding extra yield/delay makes flow very slow – more simultaneous subscribes registered – more unnecessary work done.
6 Views