Would this work as a system that allows callers to either send a signal ("flush"), or await the next signal that arrives? The calls come from different
scope.launch
blocks that are running in parallell.
Copy code
@JvmInline
internal value class Queue(
private val trigger: MutableSharedFlow<Unit> = MutableSharedFlow(extraBufferCapacity = 8),
) {
suspend fun flush() {
trigger.emit(Unit)
}
suspend fun await() {
trigger.first()
}
}
j
Joffrey
10/05/2022, 6:14 AM
Why the extra buffer?
z
Zoltan Demant
10/05/2022, 6:20 AM
What would be the difference in not using one?
From my understanding, having one prevents trigger.emit from suspending (unless exceeding 8 elements). This seems ideal for my use-case, but Im not sure it makes any difference given the trigger.first call.
Zoltan Demant
10/05/2022, 6:32 AM
Sorry, I wrote this code a couple of months ago (just investigating a bug now). I think what I was going for with the buffer is that if one client uses flush, it shouldnt be blocked until another client uses await. I believe this is what happens?
Zoltan Demant
10/05/2022, 6:42 AM
I guess the buffer doesnt matter due to the collector just being
flow.first()
? Just to verify that Ive understood things correctly, that also means that if I replace
flow.first()
with
flow.collect{ delay(5000) }
then the emit will also take 5000 ms?
j
Joffrey
10/05/2022, 9:28 AM
Technically any
flush()
call will immediately complete (and the event immediately discarded) as long as no consumer is
await()
-ing. And given that consumers just use
first()
, it's also pretty quick when one is waiting. That said, without the buffer,
flush()
callers would indeed suspend if a consumer is waiting (albeit not for long).
I was just wondering whether this was intentional
z
Zoltan Demant
10/05/2022, 10:25 AM
Seems like my understanding was not 100% on point, so thank you for helping me learn something new today!