https://kotlinlang.org logo
Title
v

v0ldem0rt

08/13/2018, 5:13 PM
why don't we already have a Semaphore implementation in coroutines?
m

Martin Devillers

08/13/2018, 5:16 PM
Couldn’t you essentially get a “suspending semaphore” behavior with a channel? Receiving on the channel will suspend until an item is offered, i.e. a “permit” is available. If the channel has unlimited capacity, then it can accumulate “permits” indefinitely.
1
👍 1
v

v0ldem0rt

08/13/2018, 5:18 PM
True that
e

elizarov

08/13/2018, 5:18 PM
^^ That is not the only reason.
v

v0ldem0rt

08/13/2018, 5:18 PM
👍
e

elizarov

08/13/2018, 5:18 PM
The true reason in why we don’t have a semaphore you’ll get if you answer the question on why you need a semaphore in the first place.
v

v0ldem0rt

08/13/2018, 5:20 PM
Absolutely agree
Do channels have any additional overheads?
e

elizarov

08/13/2018, 5:20 PM
Usually it is to to limit concurrency to some resource. But with coroutines there is a much better pattern, called “worker pool”. You just create
N
coroutines reading tasks from the same channel and performing them. Thus you’ve limited concurrent to
N
concurrent coroutines without having to use a semaphore.
There is an additional overhead. But when you have concurrency and the corresponding scheduling you already have so much overhead that any incremental overhead from channels is negligible.
👍 2
If you ditch shared mutable stable with semaphores and mutexes alike and switch to sharing immutable data via channels, you get so much benefits in clarity and tractability of your architecture, that will outweigh any performance you might loose.
v

v0ldem0rt

08/13/2018, 5:49 PM
So for fun I made an implementation anyway since it's few lines of code
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.count
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.timeunit.TimeUnit
import kotlinx.coroutines.experimental.withTimeoutOrNull

class SuspendSemaphore(initialCount: Int) {
    private val channel = Channel<Unit>()

    init {
        runBlocking {
            repeat(initialCount) { channel.send(Unit) }
        }
    }

    val onAcquire get() = channel.onReceive
    suspend fun getCurrentCounter() = channel.count()
    suspend fun acquire() = channel.receive()
    suspend fun release() = channel.send(Unit)
    suspend fun tryAcquire(timeout: Long = 0, timeUnit: TimeUnit = TimeUnit.MILLISECONDS): Boolean {
        return withTimeoutOrNull(timeout, timeUnit) {
            channel.receive()
        } == Unit
    }
}
Let me know if anything sounds suspecious
e

elizarov

08/13/2018, 5:54 PM
LGTM
👍 1
:trollface: 1
l

louiscad

08/13/2018, 8:36 PM
@v0ldem0rt Shouldn't you pass
initialCount
into
Channel<Unit>(…)
call, and use
offer(Unit)
without
runBlocking { … }
? Your code seems deadlocking to me.
v

v0ldem0rt

08/13/2018, 8:47 PM
Yep already ran into it
and switched to offer