https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

mp

02/12/2019, 2:02 PM
Is there a coroutine-flavored equivalent of a j.u.c.semaphore? I have a known amount of work to send over the network, and it can only be handled of batches of at most 10, so I’d like to simply spawn a zillion coroutines, each of which will handle 10 items, but to avoid overwhelming the other end I’d like to limit the concurrency ala a semaphore.
a

antonis

02/12/2019, 2:10 PM
I guess you can use Channels and/or Actors for that
I don’t think there is something as low level as semaphores
m

mp

02/12/2019, 2:12 PM
That’s unfortunate.
m

marstran

02/12/2019, 2:22 PM
Fanning in to a channel with a set amount of workers reading from it, is a better abstraction than a semaphore imho.
💯 4
m

mp

02/12/2019, 2:29 PM
Fan-in via a channel makes it harder to keep track of the result of each unit of work, though.
I can’t simply keep a collection of Deferreds or whatever
a

antonis

02/12/2019, 2:32 PM
Semaphores are hard imho or at least for most of us 😛
m

mp

02/12/2019, 2:34 PM
😥
a

antonis

02/12/2019, 2:34 PM
You can still use java semaphore. That’s the beauty of Kotlin
m

mp

02/12/2019, 2:35 PM
I can, but that will tie up the whole thread, no?
a

antonis

02/12/2019, 2:35 PM
yes
m

mp

02/12/2019, 2:35 PM
That defeats the purpose…
I want to spawn a few thousand coroutines, not a few thousand threads
a

antonis

02/12/2019, 2:40 PM
probably you can imitate semaphore behavior by having a block locked with Mutex https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html#mutual-exclusion
I feel that channels/actors is the way to go in a kotlin world though
m

mp

02/12/2019, 2:47 PM
OK but you see that they’re not equivalent, right…
b

bdawg.io

02/12/2019, 2:54 PM
Immediately inside of your batch worker, you could use an object pool of Mutex and try to obtain the a suspending lock which will create a suspending queue of workers waiting their turn
👍 1
m

marstran

02/12/2019, 2:58 PM
Answering: "Fan-in via a channel makes it harder to keep track of the result of each unit of work, though." You could send a response channel with the request. The caller could then just suspend by calling
receive
on response channel.
b

bdawg.io

02/12/2019, 2:59 PM
If you will only have a single response that you need to know of, a
CompletableDeferred
is cheaper than a channel is.
m

marstran

02/12/2019, 3:00 PM
Yeah, right, that is better 👍
m

mp

02/12/2019, 3:09 PM
I have thousands of responses to keep track of
I agree that response channel per request, mutex pool, etc, can work. However, I don’t want a coworker who opens this project to come away scarred for life thinking that concurrency has to be that complicated 😕
b

bdawg.io

02/12/2019, 3:21 PM
It’s going to be complicated if you’re trying to do things in a difficult way. IMO the better thing would be to only launch the number of coroutines that you want to happen concurrently and then fan out the work using a channel. https://kotlinlang.org/docs/reference/coroutines/channels.html#fan-out
Copy code
data class WorkItem(val value: String, val response: CompletableDeferred<Response> = CompletableDeferred())

val workValues: ReceiveChannel<WorkValue> = getValues()
coroutineScope {
    repeat(workerCount) {
        launch {
            doSomething(workValues.receiveNextTen())
        }
    }
}
1
m

mp

02/12/2019, 3:23 PM
“a difficult way” — the way that semaphores make really simple 😕
b

bdawg.io

02/12/2019, 3:29 PM
How is a semaphore simpler than a MutexPool? Also, semaphores are also blocking 🤷. Semaphore tracks access by the thread. MutexPool tracks access by suspension.
m

mp

02/12/2019, 3:35 PM
See the start of this thread. I’m looking for a suspending equivalent to a semaphore
b

bdawg.io

02/12/2019, 3:36 PM
Right, which is why I suggested a MutexPool (and even gave an implementation) 🙂
m

mp

02/12/2019, 3:38 PM
Merely lamenting the lack of a simpler choice.
b

bdawg.io

02/12/2019, 3:47 PM
Copy code
val semaphore = Semaphore(10)

repeat(20) {
    thread {
        // blocking
        semaphore.aquire()
        try {
            doSomething()
        } finally {
            semaphore.release()
        }
    }
}
vs
Copy code
val mutexPool = MutexPool(10)

repeat(20) {
    launch {
        // suspending
        mutexPool.withLock {
            doSomething()
        }
    }
}
I’m just trying to understand what “simpler” means
m

mp

02/12/2019, 4:22 PM
Well, one of them requires having a roll-your-own concurrency building block, and one does not.
That means that a less experienced developer can’t look at this code and see a pattern they can use easily somewhere else.
m

marstran

02/12/2019, 4:35 PM
Using fan-in on a channel is a common pattern and really straight forward to implement. I really don't get what you think is complicated with it.
m

mp

02/12/2019, 5:07 PM
I didn’t say fan-in was complicated. I said that fan-in plus enough other machinery to do what semaphores do is complicated. If you disagree, okay. 🤷‍♂️
b

bdawg.io

02/12/2019, 7:12 PM
So the issue is because Semaphore is part of the JDK whereas a MutexPool is not part of coroutines core? (event though it’s a 20 line wrapper, including whitespace, around
Mutex
and
select
from the core library). I would still argue that it sounds like the design causing the need for a semaphore has an alternative that’s simpler and cleaner in coroutines, but I’m not that close to the original problem to say more. The nice part is coroutines being open source makes it nice to open (or implement) feature requests. (Most of which either get implemented fairly quick or get rejected with a real reason)
👏 1
2
4 Views