https://kotlinlang.org logo
Title
f

frankelot

12/16/2021, 6:29 PM
👋 , I’d like to have a
suspending fun
that puts something in a queue and suspends until that “something” gets its turn and is finished being processed
seems trivial.. but I’m having trouble getting it to work
the “processor” deals with shared mutable data so I want things to be processed sequentially one by one
Tried using channels and actors but
.send()
suspends until the channel has buffer capacity and starts processing… not when its done processing
n

Nick Allen

12/16/2021, 6:54 PM
Sounds like you just want a coroutine
Mutex
f

frankelot

12/16/2021, 6:57 PM
I think you are right, taking a look
c

Casey Brooks

12/16/2021, 6:57 PM
The purpose of channels is to make a "pipeline" of async communication. Putting an item into a channel and then waiting for processing to complete isn't how they're intended to be used, which is why it's not working for you. A better pattern would be to have something put an item into Channel A, and when that item is done have it put an item into Channel B to be processed later. And ultimately when the whole pipeline is finished, data is posted back to the original caller through a callback or something like that. This would loosely be considered an "actor pattern" Alternatively, a
Flow
might be a better mechanism than raw channels, since by design the emitter will suspend until the collector has finished processing the item
f

frankelot

12/16/2021, 6:57 PM
somehow the idea of “mutex are no longer needed with coroutines” got into my head
n

Nick Allen

12/16/2021, 6:57 PM
The queue thing is possible with a
CompleteableJob
in the item to signal when done but it’s easy to mess up error situations and you lose natural structured concurrency
f

frankelot

12/16/2021, 7:04 PM
I’ll explore both approaches
mutex
and
flows
and report back, thanks! for the help
The purpose of channels is to make a “pipeline” of async communication. Putting an item into a channel and then waiting for processing to complete isn’t how they’re intended to be used, which is why it’s not working for you.
this makes sense.. I don’t want async communication
I was just using an actor to “group” mutliple coroutines into one and avoid concurrent modifications to state (which should be done using a mutex)
d

darkmoon_uk

12/16/2021, 11:38 PM
One approach to this would be to use the
suspendCoroutine
function e.g. in pseudocode:
fun queuedProcess(someParam: Param) = suspendCoroutine { continuation -> 
    addToQueue(param, continuation)
    ensureQueueIsRunning()
}
...and elsewhere, the queue processing calls
queueItem.continuation(result
)
To add some description to that;
suspendCoroutine
suspends the execution at that point and gives you a
Continuation
object that you can invoke with a result when you want that point of execution to continue. You can then store this object as part of your queued operation, and invoke it when the queued process completes.
k

K Merle

12/17/2021, 5:35 AM
You could possibly have a simple for loop of suspend functions and run them sequentially by your need.
c

Casey Brooks

12/17/2021, 3:38 PM
The
suspendCoroutine
route works, but it might be easier and make the intent clearer to use a CompletableDeferred, which would effectively do the same thing