Is there something like a blocking (read: suspendi...
# coroutines
m
Is there something like a blocking (read: suspending)
BroadcastChannel
? I want an event flow where the sender is suspended until all collectors have processed an event. Even the smallest capacity
BroadcastChannel
is still non-blocking :(
Copy code
private val _events = BroadcastChannel<MyEvent>(1) // for sending
val events = _events.asFlow() // for collecting
a
are you sure you want this? It's not terribly difficult to write but from experience, it's a great way to end up with a system that has a lot of hard to debug latency for little to no gain in return.
you lose that guarantee as soon as a downstream collector uses a channel to buffer the event or move the event somewhere else
☝🏼 1
m
How else would you model that? I’m fine with downstream consumers doing buffering or moving the processing off the coroutine. What makes sense depends on the event and would probably be a contract when using the Flow (e.g. this event must happen synchronously). All other callback/flow/eventbus-based alternatives have the same issue. The only other alternative I can think of is reverting it: The event sender needs to know all consumers and call them directly. But even that wouldn’t make it much better. It would merely reverse dependencies.
b
You could have a property on your
MyEvent
that contains a
CompletableDeferred<Unit>
and your producer can
await()
until your consumer instructs that it has `complete`d
Copy code
// PRODUCER
val event = nextEvent()
channel.send(event)
event.deferred.await()

// CONSUMER
val event = channel.receive()
process(event)
event.deferred.complete(Unit)
m
Thank you. I have more than one consumer, so I would need a list of Channels, CompletableDeferreds and synchronization for all the subscribes/unsubscribes. Having a CompletableDeferred makes it a little more explicit that you have to respond to the event somehow. But now I'm wondering whether I should actually call it event. It's more like a command 🤔
Here’s what I did in the end – in case someone has the same issue. I wasn’t able to use a
Flow
for that. It would be significant work to make that possible. I’ve written a basic manual event dispatcher. It goes through all subscriptions and sends the event asynchronously. Then it waits for all subscribers to complete the processing. Unsubscribing is handled in two ways: • Subscribers provide a
CoroutineScope
upon subscription. When that scope is closed then the subscription is automatically removed. • Upon subscribing an interface with an
unsubscribe()
function is returned for manual unsubscribing. https://gist.github.com/fluidsonic/4bf5aca6d84117f371d84894ccb976f4