Marc Knaup
04/04/2020, 10:16 AMBroadcastChannel
?
I want an event flow where the sender is suspended until all collectors have processed an event.
Even the smallest capacity BroadcastChannel
is still non-blocking :(
private val _events = BroadcastChannel<MyEvent>(1) // for sending
val events = _events.asFlow() // for collecting
Adam Powell
04/04/2020, 3:35 PMAdam Powell
04/04/2020, 3:36 PMMarc Knaup
04/04/2020, 4:35 PMbdawg.io
04/05/2020, 4:57 AMMyEvent
that contains a CompletableDeferred<Unit>
and your producer can await()
until your consumer instructs that it has `complete`dbdawg.io
04/05/2020, 5:01 AM// PRODUCER
val event = nextEvent()
channel.send(event)
event.deferred.await()
// CONSUMER
val event = channel.receive()
process(event)
event.deferred.complete(Unit)
Marc Knaup
04/05/2020, 6:39 AMMarc Knaup
02/16/2021, 7:47 PMFlow
for that. It would be significant work to make that possible.
I’ve written a basic manual event dispatcher. It goes through all subscriptions and sends the event asynchronously.
Then it waits for all subscribers to complete the processing.
Unsubscribing is handled in two ways:
• Subscribers provide a CoroutineScope
upon subscription. When that scope is closed then the subscription is automatically removed.
• Upon subscribing an interface with an unsubscribe()
function is returned for manual unsubscribing.
https://gist.github.com/fluidsonic/4bf5aca6d84117f371d84894ccb976f4