https://kotlinlang.org logo
Channels
100daysofcode
100daysofkotlin
100daysofkotlin-2021
advent-of-code
aem
ai
alexa
algeria
algolialibraries
amsterdam
android
android-architecture
android-databinding
android-studio
androidgithubprojects
androidthings
androidx
androidx-xprocessing
anime
anko
announcements
apollo-kotlin
appintro
arabic
argentina
arkenv
arksemdevteam
armenia
arrow
arrow-contributors
arrow-meta
ass
atlanta
atm17
atrium
austin
australia
austria
awesome-kotlin
ballast
bangladesh
barcelona
bayarea
bazel
beepiz-libraries
belgium
benchmarks
berlin
big-data
books
boston
brazil
brikk
budapest
build
build-tools
bulgaria
bydgoszcz
cambodia
canada
carrat
carrat-dev
carrat-feed
chicago
chile
china
chucker
cincinnati-user-group
cli
clikt
cloudfoundry
cn
cobalt
code-coverage
codeforces
codemash-precompiler
codereview
codingame
codingconventions
coimbatore
collaborations
colombia
colorado
communities
competitive-programming
competitivecoding
compiler
compose
compose-android
compose-desktop
compose-hiring
compose-ios
compose-mp
compose-ui-showcase
compose-wear
compose-web
confetti
connect-audit-events
corda
cork
coroutines
couchbase
coursera
croatia
cryptography
cscenter-course-2016
cucumber-bdd
cyprus
czech
dagger
data2viz
databinding
datascience
dckotlin
debugging
decompose
decouple
denmark
deprecated
detekt
detekt-hint
dev-core
dfw
docs-revamped
dokka
domain-driven-design
doodle
dsl
dublin
dutch
eap
eclipse
ecuador
edinburgh
education
effective-kotlin
effectivekotlin
emacs
embedded-kotlin
estatik
event21-community-content
events
exposed
failgood
fb-internal-demo
feed
firebase
flow
fluid-libraries
forkhandles
forum
fosdem
fp-in-kotlin
framework-elide
freenode
french
fritz2
fuchsia
functional
funktionale
gamedev
ge-kotlin
general-advice
georgia
geospatial
german-lang
getting-started
github-workflows-kt
glance
godot-kotlin
google-io
gradle
graphic
graphkool
graphql
graphql-kotlin
graviton-browser
greece
grpc
gsoc
gui
hackathons
hacktoberfest
hamburg
hamkrest
helios
helsinki
hexagon
hibernate
hikari-cp
hire-me
hiring
hongkong
hoplite
http4k
hungary
hyderabad
image-processing
india
indonesia
inkremental
intellij
intellij-plugins
intellij-tricks
internships
introduce-yourself
io
ios
iran
israel
istanbulcoders
italian
jackson-kotlin
jadx
japanese
jasync-sql
java-to-kotlin-refactoring
javadevelopers
javafx
javalin
javascript
jdbi
jhipster-kotlin
jobsworldwide
jpa
jshdq
juul-libraries
jvm-ir-backend-feedback
jxadapter
k2-early-adopters
kaal
kafka
kakao
kalasim
kapt
karachi
karg
karlsruhe
kash_shell
kaskade
kbuild
kdbc
kgen-doc-tools
kgraphql
kinta
klaxon
klock
kloudformation
kmdc
kmm-español
kmongo
knbt
knote
koalaql
koans
kobalt
kobweb
kodein
kodex
kohesive
koin
koin-dev
komapper
kondor-json
kong
kontent
kontributors
korau
korean
korge
korim
korio
korlibs
korte
kotest
kotest-contributors
kotless
kotlick
kotlin-asia
kotlin-beam
kotlin-by-example
kotlin-csv
kotlin-data-storage
kotlin-foundation
kotlin-fuel
kotlin-in-action
kotlin-inject
kotlin-latam
kotlin-logging
kotlin-multiplatform-contest
kotlin-mumbai
kotlin-native
kotlin-pakistan
kotlin-plugin
kotlin-pune
kotlin-roadmap
kotlin-samples
kotlin-sap
kotlin-serbia
kotlin-spark
kotlin-szeged
kotlin-website
kotlinacademy
kotlinbot
kotlinconf
kotlindl
kotlinforbeginners
kotlingforbeginners
kotlinlondon
kotlinmad
kotlinprogrammers
kotlinsu
kotlintest
kotlintest-devs
kotlintlv
kotlinultimatechallenge
kotlinx-datetime
kotlinx-files
kotlinx-html
kotrix
kotson
kovenant
kprompt
kraph
krawler
kroto-plus
ksp
ktcc
ktfmt
ktlint
ktor
ktp
kubed
kug-leads
kug-torino
kvision
kweb
lambdaworld_cadiz
lanark
language-evolution
language-proposals
latvia
leakcanary
leedskotlinusergroup
lets-have-fun
libgdx
libkgd
library-development
lincheck
linkeddata
lithuania
london
losangeles
lottie
love
lychee
macedonia
machinelearningbawas
madrid
malaysia
mathematics
meetkotlin
memes
meta
metro-detroit
mexico
miami
micronaut
minnesota
minutest
mirror
mockk
moko
moldova
monsterpuzzle
montreal
moonbean
morocco
motionlayout
mpapt
mu
multiplatform
mumbai
munich
mvikotlin
mvrx
myndocs-oauth2-server
naming
navigation-architecture-component
nepal
new-mexico
new-zealand
newname
nigeria
nodejs
norway
npm-publish
nyc
oceania
ohio-kotlin-users
oldenburg
oolong
opensource
orbit-mvi
osgi
otpisani
package-search
pakistan
panamá
pattern-matching
pbandk
pdx
peru
philippines
phoenix
pinoy
pocketgitclient
polish
popkorn
portugal
practical-functional-programming
proguard
prozis-android-backup
pyhsikal
python
python-contributors
quasar
random
re
react
reaktive
realm
realworldkotlin
reductor
reduks
redux
redux-kotlin
refactoring-to-kotlin
reflect
refreshversions
reports
result
rethink
revolver
rhein-main
rocksdb
romania
room
rpi-pico
rsocket
russian
russian_feed
russian-kotlinasfirst
rx
rxjava
san-diego
science
scotland
scrcast
scrimage
script
scripting
seattle
serialization
server
sg-user-group
singapore
skia-wasm-interop-temp
skrape-it
slovak
snake
sofl-user-group
southafrica
spacemacs
spain
spanish
speaking
spek
spin
splitties
spotify-mobius
spring
spring-security
squarelibraries
stackoverflow
stacks
stayhungrystayfoolish
stdlib
stlouis
strife-discord-lib
strikt
students
stuttgart
sudan
swagger-gradle-codegen
swarm
sweden
swing
swiss-user-group
switzerland
talking-kotlin
tallinn
tampa
teamcity
tegal
tempe
tensorflow
terminal
test
testing
testtestest
texas
tgbotapi
thailand
tornadofx
touchlab-tools
training
tricity-kotlin-user-group
trójmiasto
truth
tunisia
turkey
turkiye
twitter-feed
uae
udacityindia
uk
ukrainian
uniflow
unkonf
uruguay
utah
uuid
vancouver
vankotlin
vertx
videos
vienna
vietnam
vim
vkug
vuejs
web-mpp
webassembly
webrtc
wimix_sentry
wwdc
zircon
Powered by
Title
a

Alex Vasilkov

02/14/2021, 7:25 AM
Mutex vs StateFlow (sounds strange, but please read on) Suppose you need to synchronize few suspending calls (e.g. DB reads and DB writes) so that only one action is running at a time. You cannot use java synchronization or reentrant lock as it does not correctly work with suspension (https://blog.danlew.net/2020/01/28/coroutines-and-java-synchronization-dont-mix/). It seems like the only suitable option is to use
Mutex
, but it is very slow, if you’ll run the examples from https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html you’ll notice that
Mutex
is like 100 times slower than atomics. I found a way to use
MutableStateFlow<Boolean>
for suspending calls synchronization and it seems to be 5-10 times faster than
Mutex
, I’m not sure if there are any hidden gotchas with this approach though (see more in the thread).
1
👀 1
The code is quite simple:
private val lock = MutableStateFlow(false)

suspend fun criticalSection() = lock.withLock { … }

private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
    while (true) {
        if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
        first { !it } // Suspending until lock is released
    }

    try {
        return action()
    } finally {
        value = false // Releasing the lock
    }
}
Here is full test class comparing Mutex, StateFlow, Semaphore, reentrant lock and atomics:
package example

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.system.measureTimeMillis
import kotlin.test.Test

class Test {

    private val dispatcher = Dispatchers.Default

    @Test
    fun testMutex() = runBlocking(dispatcher) {
        val mutex = Mutex()
        var counter = 0

        massiveRun {
            mutex.withLock { counter++ }
        }
        println("Counter (mutex) = $counter")
    }

    @Test
    fun testFlow() = runBlocking(dispatcher) {
        val lock = MutableStateFlow(false)
        var counter = 0

        massiveRun {
            lock.withLock { counter++ }
        }
        println("Counter (flow) = $counter")
    }

    @Test
    fun testSemaphore() = runBlocking(dispatcher) {
        val semaphore = Semaphore(1)
        var counter = 0

        massiveRun {
            semaphore.withPermit { counter++ }
        }
        println("Counter (semaphore) = $counter")
    }

    @Test
    fun testLock() = runBlocking(dispatcher) {
        val lock = ReentrantLock()
        var counter = 0

        massiveRun {
            lock.withLock { counter++ }
        }
        println("Counter (lock) = $counter")
    }

    @Test
    fun testAtomic() = runBlocking(dispatcher) {
        val counter = AtomicInteger(0)

        massiveRun {
            counter.incrementAndGet()
        }
        println("Counter (atomic) = ${counter.get()}")
    }


    private suspend fun massiveRun(n: Int = 100, k: Int = 1000, action: suspend () -> Unit) {
        val time = measureTimeMillis {
            coroutineScope { // scope for coroutines
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }


    private suspend fun <R> MutableStateFlow<Boolean>.withLock(action: suspend () -> R): R {
        while (true) {
            if (compareAndSet(expect = false, update = true)) break // Acquiring the lock
            first { !it } // Suspending until lock is released
        }

        try {
            return action()
        } finally {
            value = false // Releasing the lock
        }
    }

}
Sample result:
Completed 100000 actions in 816 ms
Counter (mutex) = 100000

Completed 100000 actions in 134 ms
Counter (flow) = 100000

Completed 100000 actions in 17 ms
Counter (lock) = 100000

Completed 100000 actions in 782 ms
Counter (semaphore) = 100000

Completed 100000 actions in 21 ms
Counter (atomic) = 100000
Can anybody explain why Mutex (designed for that exact usage) is much slower than StateFlow approach above? Can I use this StateFlow hack or should I prefer Mutex anyway?
i

irus

02/14/2021, 7:50 AM
@Test
    fun testMutex() = runBlocking(dispatcher) {
        val mutex = Mutex()
        var counter = 0
        massiveRun {
            while(true) {
                if (mutex.tryLock()) {
                    counter++
                    mutex.unlock()
                    break
                }
            }
        }
        println("Counter (mutex) = $counter")
    }
Completed 100000 actions in 61 ms
Counter (mutex) = 100000
Completed 100000 actions in 512 ms
Counter (flow) = 100000
Completed 100000 actions in 43 ms
Counter (lock) = 100000
Completed 100000 actions in 27 ms
Counter (atomic) = 100000
a

Alex Vasilkov

02/14/2021, 7:58 AM
@irus I see your point, but your code does not suspend until the action is finished, instead it just runs
while
loop until lock is released, effectively wasting CPU cycles. If you’ll use
delay(10L)
instead of
counter++
you will have a lot of unnecessary loop cycles.
i

irus

02/14/2021, 8:03 AM
Yes, but you want better number, I gave them 🙂
Try to do at least multiple runs (add repeat to massiveRun) and you'll see that performance of mutex is changes
a

Alex Vasilkov

02/14/2021, 8:07 AM
I don't just need better numbers in synthetic tests, I need a correctly working code as well 🙂 You have different numbers, but they are still much bigger than for StateFlow, aren't they?
i

irus

02/14/2021, 8:10 AM
Right, because flow eat much more CPU
mutex, flow, lock, atomic
tryLock mutex, flow, lock, atomic
In synthetic test CPU consumption for while(true) tryLock is less than for flow variant
Another one, I added delay into hottest part of the loop:
private suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureNanoTime {
        coroutineScope { // scope for coroutines
            repeat(n) { one ->
                launch(dispatcher) {
                    repeat(k) {
                        delay(one.toLong())
                        action()
                    }
                }
            }
        }
    }
    println("Completed ${n * k} actions in ${Duration.ofNanos(time).toMillis()} ms")
}
Now mutex on par with others: Completed 100000 actions in 99587 ms Counter (mutex) = 100000 Completed 100000 actions in 99650 ms Counter (flow) = 100000 Completed 100000 actions in 99611 ms Counter (lock) = 100000 Completed 100000 actions in 99549 ms Counter (atomic) = 100000
atomic/lock works faster because in this cases you just run a bunch of task on top of thread pool.
mutex/flow slower because coroutines have to suspend/unsuspend to achieve work
a

Alex Vasilkov

02/14/2021, 11:18 AM
Well, I can hardly agree that Mutex.tryLock should be used in real life, it’s just plain wrong to waste entire thread using
while
loop for potentially long time. I’m not sure if CPU graph measurements are any reliable. Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry. The last example also looks wrong, basically at
one=99
you are scheduling a very simple computation (
counter++
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic.
👀 1
i

irus

02/14/2021, 12:55 PM
I can hardly agree that Mutex.tryLock should be used in real life
It depends on actual task you trying to solve, I can see how it can be useful in some scenarios. And ss well as mutex shouldn't be used for accessing something simple like counter. I even probably will use channel (I actually use a lot of them) for processing some shared state.
I’m not sure if CPU graph measurements are any reliable.
They are
Also what is better: run for 900ms at 20% or for 200ms at 30%? I can hardly see how this can be used for benchmarking, sorry.
tryLock running faster and less consume cpu for benchmark use-case
The last example also looks wrong, basically at
one=99
you are scheduling a very simple computation (
counter++
) to run after 99ms, 198ms, … 99_000 ms. So you are measuring the delays here, not the real logic. (edited)
Of course, but original benchmark also doesn't answer how real application would behave, because problem you see – high contention of multiple coroutines on single suspension point, and in real application you may not see it
d

Dominaezzz

02/14/2021, 12:55 PM
Have you tried
Semaphore
?
a

Alex Vasilkov

02/14/2021, 1:03 PM
@Dominaezzz Thanks for
Semaphore
suggestion, I didn’t know it can be used with suspend functions. But it turns out it’s performance is on par with
Mutex
(just a bit faster), but still noticeably slower that
MutableStateFlow
(I updated the test class and results above)
l

louiscad

02/14/2021, 1:10 PM
@irus Can you do your benchmark using exclusively
withLock
for the
Mutex
case?
a

Alex Vasilkov

02/14/2021, 1:10 PM
(BTW, from Kotlin docs:
Semaphore with permits = 1 is essentially a Mutex.
)
d

Dominaezzz

02/14/2021, 1:10 PM
I didn’t know it can be used with suspend functions.
Just to make sure, I'm talking about the kotlinx.coroutines one, not the java one.
Ah, nvm
i

irus

02/14/2021, 1:50 PM
@louiscad what do you mean?
l

louiscad

02/14/2021, 2:11 PM
@irus You're using
tryLock
instead of
withLock
, right? Or maybe the latter uses the former and it's the same (on mobile, hard to check right now)
z

Zach Klippenstein (he/him) [MOD]

02/14/2021, 4:14 PM
I’m impressed that your database is so fast that your choice of synchronization primitive even matters.
2
💯 3
☝️🏼 2
a

Alex Vasilkov

02/14/2021, 5:17 PM
Haha, that’s totally true 🙂 I’m not really planing to call the DB 1000 times per second from 100 coroutines. In my case it is closer to 1-2 potential calls from a few coroutines, but it has a high chance of being called concurrently.
I guess there can still be other use-cases where Mutex performance can be important though. And it still looks strange that simple StateFlow-based solution significantly outperforms native Mutex implementation when measured at scale. It’s interesting to understand why it happens, probably there is something more about the Mutex.
👍 1
i

irus

02/15/2021, 9:27 AM
Because of high contention you introduced in test,
withLock
goes through slow path where lock-free linked list created (

https://youtu.be/W2dOOBN1OQI?t=882

), with callback to resume and cancel coroutine. Extra objects and linked list makes entire thing slower, but not slow for real application. This can be prove by the same test, but dispatcher that works on single thread:
Executors.newFixedThreadPool(1).asCoroutineDispatcher()
Results (best of 10 runs):
Completed 100000 actions in 4 ms
Counter (mutex) = 1000000

Completed 100000 actions in 16 ms
Counter (flow) = 1000000

Completed 100000 actions in 4 ms
Counter (lock) = 1000000

Completed 100000 actions in 10 ms
Counter (channel) = 1000000


Completed 100000 actions in 2 ms
Counter (atomic) = 1000000
I'm not so aware of Flow, so can't say why it faster for high contention case, but definitely will look inside implementation to check this out
a

Alex Vasilkov

02/15/2021, 9:49 AM
Thanks. The reason is definitely with slow path, if using single thread then you eliminate the need of the synchronisation and all tests become equally fast. The test is built to check the actual synchronizatoin logic, not the fast paths. I was trying to look through the code (Mutex and StateFlow) but it is not trivial at all 🙂 My best guess so far is that Mutex has to resume all waiting coroutines each time the Mutex is unlocked, and they are competing for the lock again. With StateFlow not all collectors are guaranteed to receive intermediate values (because StateFlow’s DROP_OLDEST / CONFLATED behaviour) thus not all of them are resumed. I have no idea how close is it to the real reason though.
i

irus

02/15/2021, 9:50 AM
and they are competing for the lock again
No of course, first from linked list is taken for this case.
a

Alex Vasilkov

02/15/2021, 10:02 AM
Hm, indeed, your are right, it should only resume one waiting coroutine at a time
i

irus

02/15/2021, 10:09 AM
One interesting observation: if you add some suspend operation to critical section, mutex becomes much faster than flow, and ReentrantLock deadlocks (this obvious, but kinda funny anyway 🙂)
@Test
fun testMutex() = runBlocking(dispatcher) {
    val mutex = Mutex()
    var counter = 0
    massiveRun {
        mutex.withLock {
            yield()
            counter++
        }
    }
    println("Counter (mutex) = $counter")
}

@Test
fun testFlow() = runBlocking(dispatcher) {
    val lock = MutableStateFlow(false)
    var counter = 0
    massiveRun {
        lock.withLock {
            yield()
            counter++
        }
    }
    println("Counter (flow) = $counter")
}

@Test
fun testLock() = runBlocking(dispatcher) {
    val lock = ReentrantLock()
    var counter = 0
    massiveRun {
        lock.lock()
        try {
            yield()
            counter++
        } finally {
            lock.unlock()
        }
    }
    println("Counter (lock) = $counter")
}
Completed 100000 actions in 1449 ms
Counter (mutex) = 1000000

Completed 100000 actions in 5166 ms
Counter (flow) = 1000000

Completed 100000 actions in 172 ms
Counter (atomic) = 1000000
a

Alex Vasilkov

02/15/2021, 10:25 AM
Yeah,
yield()
drives StateFlow lock crazy, that’s for sure. Another observation is that if you’ll use
delay(1L)
instead of
yield()
the difference between Mutex and StateFlow becomes very insignificant. There is still quite a big penalty of synchronization, but StateFlow does not behave any better.
i

irus

02/15/2021, 11:09 AM
I checked how MutableStateFlow + custom lock works, and my observation is - it works "faster" because it occupies all available threads. Each emit unsuspend all coroutines waiting for collect, and for some extend it adds "parallelism" to process, but mutex works "sequentially" so it takes more time to process entire queue. I added CPU, left - mutex, right - flow. You can see that flow fill up pool (all running, instead of waiting as for mutex), and because for CAS syncronized is used, it even sometimes blocks some threads. So because of this inefficiency, I'll suggest to use Mutex or Channel
👌 1
w

wasyl

02/15/2021, 11:11 AM
Nice investigation Ruslan 🙂 Btw what’s the software you used to observe the threads here?
i

irus

02/15/2021, 11:12 AM
Thanks, it's yourkit
:thank-you: 1
a

Alex Vasilkov

02/15/2021, 12:24 PM
@irus Thanks for the interesting research! Can you explain the graph, what does yellow / green flames mean? I added coroutines and threads tracking to my test (based on
Thread.currentThread().name
) and got strange results. When using `Dispatchers.Default`:
Completed 100000 actions in 966 ms
Counter (mutex) = 100000 / used 8 threads and 582 coroutines

Completed 100000 actions in 235 ms
Counter (flow) = 100000 / used 16 threads and 1557 coroutines
When using `newFixedThreadPoolContext(16, "test")`:
Completed 100000 actions in 947 ms
Counter (mutex) = 100000 / used 16 threads and 1585 coroutines

Completed 100000 actions in 100 ms
Counter (flow) = 100000 / used 16 threads and 1260 coroutines
In first case Mutex used significantly less threads / coroutines but in the second case it used even more resources than StateFlow. Also, I tracked how many times
while(true)
loop were running in
StateFlow.withLock
method (across all 100_000 executions) and got about 110_000 for fixed thread pool and about 180_000 for Default dispatcher. In my understanding it means that there wasn’t too many unnecessary coroutine resumes.
i

irus

02/15/2021, 12:26 PM
green - thread in running state yellow - waiting (most probably unsafe.park, waiting for work to be assigned)
Because of such number of coroutines trying to access single point (flow/mutex) it's probably not like "we just registered 100000 subscribes, and start to unsuspend them", it's more likely continuous process, and till very last cycles new coroutines will subscribe on flow. That's why adding extra yield/delay makes flow very slow – more simultaneous subscribes registered – more unnecessary work done.