https://kotlinlang.org logo
Title
n

natario1

11/30/2021, 10:30 AM
I often wish there was a channel with input and output types, something like
Queue<In, Out>(process: suspend (In) -> Out, onBufferOverflow, capacity)
where I can send input and suspend until output is ready:
val out = queue.send(in)
, use
trySend
and so on. I'm curious if it's just me, do you ever find yourself needing something similar and what do you use?
j

Joffrey

11/30/2021, 10:32 AM
I'm not sure I understand your use case. Which
out
do you want to get when you call
send(in)
? Is it the one corresponding to the
in
? Then why not just call the suspend function of your queue directly?
n

natario1

11/30/2021, 10:34 AM
To use all the nice channel utilities - capacity, onBufferOverflow, trySend vs. send and so on.
j

Joffrey

11/30/2021, 10:34 AM
I'm missing the "queue" aspect here though. How would
capacity
or
onBufferOverflow
affect the result you get from
queue.send(in)
here?
If it's complicated to define in an abstract way, could you please share a use case that illustrates how you would use such a queue?
n

natario1

11/30/2021, 11:09 AM
Just like a normal channel, but taking processing into account. E.g. for a rendezvous queue with BufferOverflow.SUSPEND, if the queue is busing processing an item,
send(in)
suspends until processor is free, then suspends until result is ready. Or: for
trySend(in)
+ BufferOverflow.DROP_LATEST, fail if we're already processing something else.
j

Joffrey

11/30/2021, 11:28 AM
Ah I see now, thanks for clarifying. It looks like you're trying to emulate what coroutines already give you. For instance in your first example it seems you just want to run a suspend function with limited concurrency. The queue aspect of it could actually just be the coroutines dispatch itself. You could achieve the same result using a Mutex around your suspending function:
val out = mutex.withLock { process(in) }
You could also deal with it via the dispatcher directly:
val out = withContext(dispatcher) { process(in) }
with a single-threaded dispatcher or using limitedParallelism, which is experimental in 1.6.0-RC. Regarding your second use case, I guess
Mutex.tryLock
could emulate what you need (dealing with the "already locked" state the same way you would deal with the result of
trySend
).
I almost never needed this so far though, apart from protecting a non-thread-safe piece of code, which is basically what mutex is for. Do you have real-life use cases where you needed to do this? I'd be interested in them
e

ephemient

11/30/2021, 11:43 AM
Mutex is documented to be "fair" as well, e.g. waiters will be woken in order
👍 1
j

Joffrey

11/30/2021, 11:46 AM
Yeah this might be important if
process
has side-effects, thanks for pointing it out
n

natario1

12/01/2021, 12:32 PM
Yeah mutex helps, but it does not offer enough flexibility for the queue holder (like: queue up to X tasks). I based my implementation on
Channel
instead, especially because capacity / bufferOverflow semantics fit very well. Maybe a semaphore would have worked too. Anyway, it's interesting to see that people do not feel the need for a
<In, Out>
typed component. Might be specific to the nuances of what I'm doing. Simple example since you asked:
interface Store {
    val results: Flow<Out>
    suspend fun refresh(payload: In)
}
Store wants to avoid too many refreshes, or wants to decide whether to refresh or not based on the
In
payload, the number of enqueued refreshes, and so on. Might want to DROP_OLDEST, DROP_LATEST... On the other hand,
refresh
is suspending because the caller wants to know when/if the payload is handled (not just enqueued).
j

Joffrey

12/01/2021, 1:36 PM
Maybe a semaphore would have worked too.
It depends on what you're using the queue for. My understanding was that the processing queue was a way for you to limit concurrency to 1. A semaphore would limit concurrency to N tasks, but that would not be a bound on the queue size, it would be a bound on the concurrently executing tasks (the queue would still be infinite, limited by heap memory). So it's probably not what you wanted here.
Anyway, it's interesting to see that people do not feel the need for a 
<In, Out>
 typed component
Well that's quite a generalization from the initial use case. You were specifically mentioning calling a function with an input and returning a value after some time. This is what suspend functions are all about, and this is why I said that it seemed you were trying to reimplement coroutines dispatch yourself. I wasn't making a statement about having
<In, Out>
generic types, this is way too general to rule out, of course.
Simple example since you asked
This
Store
interface doesn't fit the description from your initial message.
refresh
doesn't return the value corresponding to the given
payload
. You're instead getting the results via a separate
Flow
, so my initial suggestion is no longer appropriate (modelling it with plain suspend functions returning a value and let the coroutine dispatch do its thing). You don't actually need in/out anymore per se. That new use case doesn't seem very common, though, because usually you either send a payload and wait for a result, or you send a payload and don't wait at all (some other component will get the updates through the Flow). Having to wait for the processing of the item but not get the result seems strange to me - what's the purpose of a queue if callers don't just enqueue but also wait for the processing to happen anyway? About
DROP_LATEST
/
DROP_OLDEST
, what happens to the caller of
refresh
when its own payload is discarded? Does it just stop waiting? Then when the function returns you don't really have a guarantee that the element was processed. If you eliminate this need for waiting for the actual processing of the element by the queue, then you just need a
SendChannel
for producers (the callers of
refresh
), and then you can convert it to a flow with
debounce
for the conflation,
map
for the processing, and potentially other things you need before exposing that
Flow
to the consumers.
n

natario1

12/01/2021, 2:06 PM
This 
Store
 interface doesn't fit the description from your initial message.
Sure - all my messages are about the low level queue. Store was a real-life example of a class that can use such queue under the hood.
About 
DROP_LATEST
 / 
DROP_OLDEST
, what happens to the caller of 
refresh
 when its own payload is discarded?
It should throw. But I still need to implement
DROP*
behavior in the queue, it's not very straightforward.
usually you either send a payload and wait for a result, or
That's a good description of the use case 🙂 callers want to send a payload and wait for a result, callee wants to process payloads in its own scope in a queue, and make decisions about whether to enqueue, run or drop them.
👌 1
j

Joffrey

12/01/2021, 2:25 PM
It should throw
(...)
callers want to send a payload and wait for a result
So
refresh
would actually return
Out
and not
Unit
? But it's also not guaranteed to run at all and could just fail in a way that cannot be controlled/predicted/prevented by the caller, right?
callee wants to process payloads in its own scope in a queue, and make decisions about whether to enqueue, run or drop them
So the reason for the queue is to allow you to drop events?
Store was a real-life example of a class that can use such queue under the hood.
Even if said
Queue
API was available, I'm not sure it would be very straighforward to implement this
Store
anyway, especially regarding the `DROP_OLDEST`/`DROP_LATEST` behaviour. I think I'm failing to see the bigger picture of how that store would be used in practice. In any case, it would indeed be possible to implement what you're looking for (maybe by enqueuing a
CompletableDeferred
along with the payload), but I don't see it as a common need.