natario1
11/30/2021, 10:30 AMQueue<In, Out>(process: suspend (In) -> Out, onBufferOverflow, capacity)
where I can send input and suspend until output is ready: val out = queue.send(in)
, use trySend
and so on. I'm curious if it's just me, do you ever find yourself needing something similar and what do you use?Joffrey
11/30/2021, 10:32 AMout
do you want to get when you call send(in)
? Is it the one corresponding to the in
? Then why not just call the suspend function of your queue directly?natario1
11/30/2021, 10:34 AMJoffrey
11/30/2021, 10:34 AMcapacity
or onBufferOverflow
affect the result you get from queue.send(in)
here?natario1
11/30/2021, 11:09 AMsend(in)
suspends until processor is free, then suspends until result is ready.
Or: for trySend(in)
+ BufferOverflow.DROP_LATEST, fail if we're already processing something else.Joffrey
11/30/2021, 11:28 AMval out = mutex.withLock { process(in) }
You could also deal with it via the dispatcher directly:
val out = withContext(dispatcher) { process(in) }
with a single-threaded dispatcher or using limitedParallelism, which is experimental in 1.6.0-RC.
Regarding your second use case, I guess Mutex.tryLock
could emulate what you need (dealing with the "already locked" state the same way you would deal with the result of trySend
).ephemient
11/30/2021, 11:43 AMJoffrey
11/30/2021, 11:46 AMprocess
has side-effects, thanks for pointing it outnatario1
12/01/2021, 12:32 PMChannel
instead, especially because capacity / bufferOverflow semantics fit very well. Maybe a semaphore would have worked too.
Anyway, it's interesting to see that people do not feel the need for a <In, Out>
typed component. Might be specific to the nuances of what I'm doing.
Simple example since you asked:
interface Store {
val results: Flow<Out>
suspend fun refresh(payload: In)
}
Store wants to avoid too many refreshes, or wants to decide whether to refresh or not based on the In
payload, the number of enqueued refreshes, and so on. Might want to DROP_OLDEST, DROP_LATEST... On the other hand, refresh
is suspending because the caller wants to know when/if the payload is handled (not just enqueued).Joffrey
12/01/2021, 1:36 PMMaybe a semaphore would have worked too.It depends on what you're using the queue for. My understanding was that the processing queue was a way for you to limit concurrency to 1. A semaphore would limit concurrency to N tasks, but that would not be a bound on the queue size, it would be a bound on the concurrently executing tasks (the queue would still be infinite, limited by heap memory). So it's probably not what you wanted here.
Anyway, it's interesting to see that people do not feel the need for aWell that's quite a generalization from the initial use case. You were specifically mentioning calling a function with an input and returning a value after some time. This is what suspend functions are all about, and this is why I said that it seemed you were trying to reimplement coroutines dispatch yourself. I wasn't making a statement about havingtyped component<In, Out>
<In, Out>
generic types, this is way too general to rule out, of course.
Simple example since you askedThis
Store
interface doesn't fit the description from your initial message. refresh
doesn't return the value corresponding to the given payload
. You're instead getting the results via a separate Flow
, so my initial suggestion is no longer appropriate (modelling it with plain suspend functions returning a value and let the coroutine dispatch do its thing). You don't actually need in/out anymore per se.
That new use case doesn't seem very common, though, because usually you either send a payload and wait for a result, or you send a payload and don't wait at all (some other component will get the updates through the Flow). Having to wait for the processing of the item but not get the result seems strange to me - what's the purpose of a queue if callers don't just enqueue but also wait for the processing to happen anyway?
About DROP_LATEST
/ DROP_OLDEST
, what happens to the caller of refresh
when its own payload is discarded? Does it just stop waiting? Then when the function returns you don't really have a guarantee that the element was processed.
If you eliminate this need for waiting for the actual processing of the element by the queue, then you just need a SendChannel
for producers (the callers of refresh
), and then you can convert it to a flow with debounce
for the conflation, map
for the processing, and potentially other things you need before exposing that Flow
to the consumers.natario1
12/01/2021, 2:06 PMThisSure - all my messages are about the low level queue. Store was a real-life example of a class that can use such queue under the hood.interface doesn't fit the description from your initial message.Store
AboutIt should throw. But I still need to implement/DROP_LATEST
, what happens to the caller ofDROP_OLDEST
when its own payload is discarded?refresh
DROP*
behavior in the queue, it's not very straightforward.
usually you either send a payload and wait for a result, orThat's a good description of the use case 🙂 callers want to send a payload and wait for a result, callee wants to process payloads in its own scope in a queue, and make decisions about whether to enqueue, run or drop them.
Joffrey
12/01/2021, 2:25 PMIt should throw
(...)
callers want to send a payload and wait for a resultSo
refresh
would actually return Out
and not Unit
? But it's also not guaranteed to run at all and could just fail in a way that cannot be controlled/predicted/prevented by the caller, right?
callee wants to process payloads in its own scope in a queue, and make decisions about whether to enqueue, run or drop themSo the reason for the queue is to allow you to drop events?
Store was a real-life example of a class that can use such queue under the hood.Even if said
Queue
API was available, I'm not sure it would be very straighforward to implement this Store
anyway, especially regarding the `DROP_OLDEST`/`DROP_LATEST` behaviour.
I think I'm failing to see the bigger picture of how that store would be used in practice.
In any case, it would indeed be possible to implement what you're looking for (maybe by enqueuing a CompletableDeferred
along with the payload), but I don't see it as a common need.