What is the right way to synchronize threads in co...
# announcements
n
What is the right way to synchronize threads in common code? I'm currently using
Copy code
val mutex = Mutex()
    
runBlocking { 
    mutex.withLock {
        ... 
    }
}
as a workaround, but not sure if it's a good practice.
d
That's ... I guess it works but.. Do you have to use threads?
a
Yeah, assuming the Mutex is from coroutines, it synchronizes coroutines and not just threads.
n
I'm working on a suspendable rate limiter (https://pastebin.com/se22mmEx), it seems to work as expected, but didn't test properly yet. Thread synchronization is required in tryAcquire, it simply won't work without it for obvious reason.
a
It should work I guess. But AFAIK it is a matter of terminology. Mutex synchronizes coroutines, not just threads. Which means if a coroutine suspends while the Mutex is acquired, another coroutine will wait for the Mutex, even if it works on the same thread.
d
That recursive
tryAcquire
looks very dangerous.
I think you could implement this component without the
Mutex
. You already have atomics.
n
I can't imagine how to make it work without
Mutex
d
All you need is a single atomic that is "time since last permit was acquired".
If time was long enough, then you can acquire otherwise delay/return false.
n
It just breaks if I leave it like this
Copy code
override fun tryAcquire(): Boolean {
        val isAcquired = permitsLeft.getAndDecrement() >= 1

        if (!isAcquired && isTimeToReset()) {
            if (isTimeToReset()) {
                resetAndAcquire(); return true
            } else {
                tryAcquire()
            }
        }

        if (isAcquired) {
            timeFirstPermitAcquired.compareAndSet(null, timeSource.markNow())
        }

        return isAcquired
    }
if that's what you mean
What, it's somehow doesn't
Thanks!
d
I meant you could make the whole class much simpler, but if that works then cool.
n
Never mind, tested in a single thread. Anyway, we got off topic.
d
Copy code
@ExperimentalTime
@ExperimentalCoroutinesApi
private class RateLimiterImpl(val permitsPerSecond: Int, val timeSource: TimeSource) {
    init {
        require(permitsPerSecond > 0) { "Rate must be > 0" }
    }

    private val lastPermitTime = AtomicReference<TimeMark?>(null)
    private val timeBetweenPermits = Duration.seconds(1.0 / permitsPerSecond)

    suspend fun acquire() {
        while (true) {
            val lastMark = lastPermitTime.get()
            if (lastMark == null || lastMark.elapsedNow() > timeBetweenPermits) {
                if (lastPermitTime.weakCompareAndSetVolatile(lastMark, timeSource.markNow())) {
                    break
                }
            } else {
                delay(timeBetweenPermits - lastMark.elapsedNow())
            }
        }
    }

    fun tryAcquire(): Boolean {
        while (true) {
            val lastMark = lastPermitTime.get()
            if (lastMark == null || lastMark.elapsedNow() > timeBetweenPermits) {
                if (lastPermitTime.weakCompareAndSetVolatile(lastMark, timeSource.markNow())) {
                    return true
                }
            } else {
                return false
            }
        }
    }
}
n
It works, but if I understood correctly you can't "burst" without a
timeBetweenPermits
delay between
acquire
calls.
So not really what I need.
d
"burst"?
Ah you want to acquire multiple permits without delay.
n
Yep
d
Still possible with atomics but I can't be arsed to write that haha.
n
🙂