https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Marc Knaup

10/08/2020, 10:06 PM
Has anybody written something like a concurrent FIFO processing queue with a dynamic concurrency limit? I want to process operations in parallel but the desired concurrency changes over time. Example: I throw 50,000 elements at once into the queue. The concurrency starts with 100, so up to 100 elements will be processed in parallel at any given time. After a while the error count increases, so I want to dynamically reduce the concurrency to 20. The number of elements processed in parallel will go down from 100 to 20 as the processing for more elements completes. Once the error count is back to zero for a little while, I change the concurrency to 100 again. Now there will be a sudden jump from 20 elements processed in parallel back to 100. The sender that puts something in the queue should be blocked and the result returned or an exception be thrown.
z

Zach Klippenstein (he/him) [MOD]

10/08/2020, 10:20 PM
I don't know if any prefab implementations of this exist, but certainly seems like it wouldn't be too hard to implement with the standard channel + worker coroutines pattern. You'd just need your own poison pill value to send to the channel n times to kill n workers without cancelling active tasks.
m

Marc Knaup

10/08/2020, 10:38 PM
A “poison pill” sounds good in theory 🤔 I just have to make sure that it’s delivered before other pending elements in the channel. I’ll try it out with using select 🙂
d

Dominaezzz

10/08/2020, 10:42 PM
Could also use a
Semaphore
and acquire permits to limit concurrency.
m

Marc Knaup

10/08/2020, 11:01 PM
So I would have to “hot-swap” them if I want to change the permit count?
z

Zach Klippenstein (he/him) [MOD]

10/08/2020, 11:04 PM
You could use a secondary channel to deliver poison pills, and put it before the primary channel in a select so that it always gets read from first.
m

Marc Knaup

10/08/2020, 11:05 PM
Yeah that’s what I’m doing now. Will share when I have something useful 🙂
it works 🙂 https://gist.github.com/fluidsonic/ba32de21c156bbe8424c8d5fc20dcd8e Only downside with the kill approach is that if I set the limit to 100,000 it’ll instantly start 100,000 coroutines.
It’s probably a bit messy with the coroutine contexts/scopes. I still haven’t figured that out 🙄
E.g. it feels weird that I pass a parent context and make it
Closeable
.
s

spand

10/09/2020, 7:30 AM
Sounds like a Circuit breaker. I think resilience4j have a kotlin implementation/wrapper that a coworker is implementing for us as we speak
d

Dominaezzz

10/09/2020, 8:24 AM
So I would have to “hot-swap” them if I want to change the permit count?
No you would just have an high initial permit count like
Semaphore(100)
. Then if you want to limit concurrency by 15, call
acquire
85 times, and adjust concurrency by releasing and acquiring permits. (Unless I misunderstand what you mean by "hot-swap")
Since
Semaphore
is fair, you might want to acquire permits concurrently.
m

Marc Knaup

10/09/2020, 9:19 AM
Makes sense. However I consider 100 quite low and prefer to have a solution that can easily go several orders of magnitudes higher.
8 Views