Is there a simple way to have a channel / workerpo...
# coroutines
t
Is there a simple way to have a channel / workerpool that can only have 1 active coroutine? (Meaning that any message send while the coroutine is running would be dropped). From reading I understand that CONFLATED will have 1 in the queue + 1 running, and that RendezVous would suspend the send and not ignore it.
g
not sure that got your question correctly. Looks that you mix channel capacity and coroutines Conflated always has 0 or 1 values, each next value will replace old one.
RendezVous also doesn’t have buffer and suspend until current message is consumed
you can use
offer
instead of
send
if just want drop message if channel is full
t
Ok so let's talk about channel full definition and worker pool or channel usage.
Take
Copy code
repeat(maxConcurrentTasks) {
            launch {
                for (task in tasks) executeTask(task)
            }
        }
When the executeTask run the message is removed from the channel before running?
So at that point the message is being handled but the channel is empty?
(tasks being the channel)
g
worker pool is just a pattern, when multiple coroutines read messages from a single channel and do some work. So 1 coroutine in pool actually means there is no pool, just a coroutine consumes channel
yes, correct, this coroutine consumes message from tasks channel
but the channel is empty
it depends on channel implementation, it may be not empty and have buffered values
t
Yes that I know, let me try to rephrase.
I need a coroutine pattern that allows me have at least 1 running job, drop job queuing when a job is running, and when does nothing start the send job.
Channels seemed to be the solution but can't find how to handle the drop job while running.
g
why you cannot use offer for that?
or you really want to drop while coroutine is running?
t
I want to drop while running.
g
I don’t think that you can find channel for that, channel knows nothing about consumer, so cannot know when consumer is running
I believe you need special API for that
t
So what would be the pattern for that?
g
maybe just callback with suspend labmda?
tasks.onTask { task -> executeTask(task) }
so when
tasks
has a task, code inside of this suspend lambda invoking and
tasks
cannot emit more events
would be good to hear what do you want to achieve, maybe there is some better solution for you case
t
Well a few cases that would fit, but the main current use can be resumed as I have something that write tasks in a DB and send a message that there's something that needs to be handled. While the handler runs, it will handle new additions to the DB, so all further messages are ignored. Then stops when all is handled. When something is added later start again.
DB part is mandatory to handle resuming after crash / force stop / ....
Currently using a singlethread tread pool, with activeCount check and that works perfectly, but as I'm moving to more coroutines and all the DB access being suspended, I'd like to fully embrace coroutines for that part and avoid runBlocking part to merge the 2 worlds
g
little bit strange that you ignore messages and loose them, but anyway, maybe there are valid use cases for that. Anyway, I believe you cannot implement this using pure channels, you need some abstraction that receiving messages, runs executor and deceides, should new received message be dropped or not
actually, just an actor may be a good for that You send messages/tasks to this actor and inside you have own worker pool which executes them, but actor drop events if some of tasks now is running
like an actor + worker pool inside
t
Messages are ignored because there are not needed, there's 1 message per DB entry when running in nominal state, but when resuming after stop needs to handle all entries without a message, and since list can be huge entries are handled 1 by 1. So 1 messages manage all the DB, and while running new entries are handled, queuing 100 messages that will just check DB see it's empty then return is 100 useless DB call that can be avoided.
I've started to look at actor but they are labelled as deprecated 😞
Since they are channel based that why I looked into channels directly.
g
actor is just a channel + coroutine
actors deprecated just because will be replaced by typed version
t
Ok, leaves the how to drop, I suppose I need to use a conflated, be sure there's only one coroutine working so can avoid using atomicBoolean to ensure sync.
g
If you worry about deprecated API, just write simple class which has method to get SendChannel (to send messages to actor) and run coroutine inside which consumes this channel so you will get simple actor with internal state that possible to use in multi-threaded environment
I mean there are many ways to do that, atomicboolean is actually fine solution
t
AtomicBoolean works, but after watching and reading so many things about the fact coroutines are the way to get rid of those and shared mutable state, I want to try to get it done right. But thanks for all the info I have more stuff to try to look into.
g
get rid of those and shared mutable state
yes, this is what actor for!
actor has 1 coroutine that run on a single thread, so you don’t have shared mutable state
t
Ok so will use the deprecated thing and hope next version won't be too complex to update to 🙂
g
As I said, you can just write class that returns send channel
or maybe you can have method on this class
sendEvent
, and this sendEvent checks internal state using any technique, AtomicBoolean, or some sort of mutex or
offers
value to another channel
t
Ok will look into that too 🙂 Coroutines are great for many things, but some pattern require complete different approach 😞
So after a few tests, actor + RendezVous + Offer does exactly what I want 🙂 Thanks again.
👍 1
u
I think the conflated channel is what you need. Especially to avoid a race condition between enqueueing new changes and progressing pending ones. If you don't think conflated channel is right, did you check offer, as suggested?
t
Conflated channels is 1 in queue, but does not handle the running coroutine as it's not managed by the queue. Actor + Rendez vous does properly handle 1 active job, in that case offer works as the queue is full as long as the job is running.
u
But you might offer an event after the worker had picked up all changes but is still processing. Then you lose part of the change set
t
Not really for my use case and implementation, but sure for other case Conflated is a right choice.
u
If you really want to drop, what was wrong about rendezvous channel combined with offer?
t
Because from reading the doc it did not seems to be possible that why I asked here. RendezVous channel have no buffer and is based on the fact that send and receive must meet. Offer in that case sounds odd as there’s no buffer what happens if there’s no receive running at that point?
u
If there is no receiver, or in your case the receiver is busy processing your last message, the new message will be dropped, just like you stated to need for your case
t
In the end the actor in rendezvous mode seems to be a channel in rendezvous so I guess this is what I’m using 🙂 I should read more about those, but with the 1.0 release tons of example are outdated 😞