https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tolriq

11/29/2018, 9:05 AM
Is there a simple way to have a channel / workerpool that can only have 1 active coroutine? (Meaning that any message send while the coroutine is running would be dropped). From reading I understand that CONFLATED will have 1 in the queue + 1 running, and that RendezVous would suspend the send and not ignore it.
g

gildor

11/29/2018, 9:12 AM
not sure that got your question correctly. Looks that you mix channel capacity and coroutines Conflated always has 0 or 1 values, each next value will replace old one.
RendezVous also doesn’t have buffer and suspend until current message is consumed
you can use
offer
instead of
send
if just want drop message if channel is full
t

Tolriq

11/29/2018, 9:15 AM
Ok so let's talk about channel full definition and worker pool or channel usage.
Take
Copy code
repeat(maxConcurrentTasks) {
            launch {
                for (task in tasks) executeTask(task)
            }
        }
When the executeTask run the message is removed from the channel before running?
So at that point the message is being handled but the channel is empty?
(tasks being the channel)
g

gildor

11/29/2018, 9:16 AM
worker pool is just a pattern, when multiple coroutines read messages from a single channel and do some work. So 1 coroutine in pool actually means there is no pool, just a coroutine consumes channel
yes, correct, this coroutine consumes message from tasks channel
but the channel is empty
it depends on channel implementation, it may be not empty and have buffered values
t

Tolriq

11/29/2018, 9:18 AM
Yes that I know, let me try to rephrase.
I need a coroutine pattern that allows me have at least 1 running job, drop job queuing when a job is running, and when does nothing start the send job.
Channels seemed to be the solution but can't find how to handle the drop job while running.
g

gildor

11/29/2018, 9:20 AM
why you cannot use offer for that?
or you really want to drop while coroutine is running?
t

Tolriq

11/29/2018, 9:20 AM
I want to drop while running.
g

gildor

11/29/2018, 9:21 AM
I don’t think that you can find channel for that, channel knows nothing about consumer, so cannot know when consumer is running
I believe you need special API for that
t

Tolriq

11/29/2018, 9:21 AM
So what would be the pattern for that?
g

gildor

11/29/2018, 9:21 AM
maybe just callback with suspend labmda?
tasks.onTask { task -> executeTask(task) }
so when
tasks
has a task, code inside of this suspend lambda invoking and
tasks
cannot emit more events
would be good to hear what do you want to achieve, maybe there is some better solution for you case
t

Tolriq

11/29/2018, 9:25 AM
Well a few cases that would fit, but the main current use can be resumed as I have something that write tasks in a DB and send a message that there's something that needs to be handled. While the handler runs, it will handle new additions to the DB, so all further messages are ignored. Then stops when all is handled. When something is added later start again.
DB part is mandatory to handle resuming after crash / force stop / ....
Currently using a singlethread tread pool, with activeCount check and that works perfectly, but as I'm moving to more coroutines and all the DB access being suspended, I'd like to fully embrace coroutines for that part and avoid runBlocking part to merge the 2 worlds
g

gildor

11/29/2018, 9:30 AM
little bit strange that you ignore messages and loose them, but anyway, maybe there are valid use cases for that. Anyway, I believe you cannot implement this using pure channels, you need some abstraction that receiving messages, runs executor and deceides, should new received message be dropped or not
actually, just an actor may be a good for that You send messages/tasks to this actor and inside you have own worker pool which executes them, but actor drop events if some of tasks now is running
like an actor + worker pool inside
t

Tolriq

11/29/2018, 9:32 AM
Messages are ignored because there are not needed, there's 1 message per DB entry when running in nominal state, but when resuming after stop needs to handle all entries without a message, and since list can be huge entries are handled 1 by 1. So 1 messages manage all the DB, and while running new entries are handled, queuing 100 messages that will just check DB see it's empty then return is 100 useless DB call that can be avoided.
I've started to look at actor but they are labelled as deprecated 😞
Since they are channel based that why I looked into channels directly.
g

gildor

11/29/2018, 9:34 AM
actor is just a channel + coroutine
actors deprecated just because will be replaced by typed version
t

Tolriq

11/29/2018, 9:37 AM
Ok, leaves the how to drop, I suppose I need to use a conflated, be sure there's only one coroutine working so can avoid using atomicBoolean to ensure sync.
g

gildor

11/29/2018, 9:39 AM
If you worry about deprecated API, just write simple class which has method to get SendChannel (to send messages to actor) and run coroutine inside which consumes this channel so you will get simple actor with internal state that possible to use in multi-threaded environment
I mean there are many ways to do that, atomicboolean is actually fine solution
t

Tolriq

11/29/2018, 9:41 AM
AtomicBoolean works, but after watching and reading so many things about the fact coroutines are the way to get rid of those and shared mutable state, I want to try to get it done right. But thanks for all the info I have more stuff to try to look into.
g

gildor

11/29/2018, 9:41 AM
get rid of those and shared mutable state
yes, this is what actor for!
actor has 1 coroutine that run on a single thread, so you don’t have shared mutable state
t

Tolriq

11/29/2018, 9:42 AM
Ok so will use the deprecated thing and hope next version won't be too complex to update to 🙂
g

gildor

11/29/2018, 9:43 AM
As I said, you can just write class that returns send channel
or maybe you can have method on this class
sendEvent
, and this sendEvent checks internal state using any technique, AtomicBoolean, or some sort of mutex or
offers
value to another channel
t

Tolriq

11/29/2018, 9:46 AM
Ok will look into that too 🙂 Coroutines are great for many things, but some pattern require complete different approach 😞
So after a few tests, actor + RendezVous + Offer does exactly what I want 🙂 Thanks again.
👍 1
u

uli

11/29/2018, 7:36 PM
I think the conflated channel is what you need. Especially to avoid a race condition between enqueueing new changes and progressing pending ones. If you don't think conflated channel is right, did you check offer, as suggested?
t

Tolriq

11/29/2018, 7:44 PM
Conflated channels is 1 in queue, but does not handle the running coroutine as it's not managed by the queue. Actor + Rendez vous does properly handle 1 active job, in that case offer works as the queue is full as long as the job is running.
u

uli

11/29/2018, 8:04 PM
But you might offer an event after the worker had picked up all changes but is still processing. Then you lose part of the change set
t

Tolriq

11/29/2018, 8:07 PM
Not really for my use case and implementation, but sure for other case Conflated is a right choice.
u

uli

11/29/2018, 8:36 PM
If you really want to drop, what was wrong about rendezvous channel combined with offer?
t

Tolriq

11/29/2018, 9:29 PM
Because from reading the doc it did not seems to be possible that why I asked here. RendezVous channel have no buffer and is based on the fact that send and receive must meet. Offer in that case sounds odd as there’s no buffer what happens if there’s no receive running at that point?
u

uli

11/29/2018, 9:32 PM
If there is no receiver, or in your case the receiver is busy processing your last message, the new message will be dropped, just like you stated to need for your case
t

Tolriq

11/29/2018, 9:40 PM
In the end the actor in rendezvous mode seems to be a channel in rendezvous so I guess this is what I’m using 🙂 I should read more about those, but with the 1.0 release tons of example are outdated 😞
4 Views