https://kotlinlang.org logo
#flow
Title
# flow
m

Marc Knaup

01/11/2021, 11:49 AM
Is there a way to suspend an
emit()
invocation of a
MutableSharedFlow
until all subscribers have completed collecting the element? There’s no replay.
e

elizarov

01/12/2021, 2:14 PM
No. In general, Flow collection is asynchronous (which becomes especially obvious if use
buffer
operator or other similar ones) and there is no support for "processing completed" signal in the flow infrastructure.
m

Marc Knaup

01/12/2021, 2:20 PM
Thanks, makes sense. Looks like I cannot use it for my event bus then 😕
b

bezrukov

01/12/2021, 10:34 PM
It depends on your use case, but probably it will work for you: You can send special confirmation token after each real
value
. (Note that you will need some extra synchronization to always send value+confirmation paired) On collector side you will need to filter such tokens. If it's ok to receive "processing complete" with few extra redispatches it might work for you.
m

Marc Knaup

01/13/2021, 9:22 AM
That only works well in cases where you have only a single collector. In an event system you may have any number of collectors which makes it much more tricky to synchronize everything. You'll need
n
confirmations where
n
is determined the moment you emit.
b

bezrukov

01/13/2021, 9:32 AM
No, I don't think so - with properly configured shared flow (with no buffer) fast collectors won't produce early confirmation
Sample demonstrates how it works for two collectors https://pl.kotl.in/EQOzegiQ_ output:
Copy code
fast collected 1
slow collected 1
confirmed 1
fast collected 2
slow collected 2
confirmed 2
fast collected 3
slow collected 3
confirmed 3
fast collected 4
slow collected 4
confirmed 4
fast collected 5
slow collected 5
confirmed 5
m

Marc Knaup

01/13/2021, 9:35 AM
No, I mean whoever emits upstream needs to know how many confirmations they have to wait for.
b

bezrukov

01/13/2021, 9:36 AM
just make a function
emitWithConfirmation
and use only it. Then every new emitter will wait for previous confirmation
emitWithConfirmation will be something like
Copy code
mutex.withLock {
   flow.emit(value)
   flow.emit(specialToken)
}
m

Marc Knaup

01/13/2021, 9:38 AM
I still don't know how many confirmations to wait for 🤔
e

elizarov

01/13/2021, 9:40 AM
@Marc Knaup Why do you care to wait until it is processed then? Are you sure you really care?
That is, if unknown number of subscribers come and go and thus may arbitrarily miss messages, then what's the point in waiting until all the subscribers that just random happen to be there at the moment of emission finish processing the event?
m

Marc Knaup

01/13/2021, 9:55 AM
For now I care because the event handlers cause side effects in the backend that the API client (frontend) has to somehow wait for before making the next API call. E.g. API client: "register new user" Backend: creates user, payment module receives event and sets up a stripe account API client: save credit card <- race condition if backend doesn't wait for payment system in previous call Backend: saves credit card API client: make payment Or: API client: accept insurance quote Backend: create insurance, documentation system receives event and creates necessary documentation files and puts them on Amazon S3 API client: fetch documentation Frontend: present all to user I'll rarely if ever have subscribers come and go while the system is already running. The event bus is not used for asynchronous execution but for modularization of code. The user registration logic doesn't need to know that the payment or other systems have to set up something. It only sends a message "new user registered" and other parts of the application make sure that they update their state as needed. And don't complete the user creation until they're done. It's decoupled synchronous code. The API client (frontend) doesn't (need to) know about that either. It only cares about "after a new user was registered I can use payment-related API". Flow is great for events, but I think you're right that they're not a good fit for such a use case.
If one event processor is truly asynchrous and there's no need to wait for the completion or can simply decide to launch a new coroutine and return immediately in the collector. The processing side decides if it's working synchronously (with completion feedback) or asynchronously (without feedback).
e

elizarov

01/13/2021, 10:05 AM
But why the need for event bus then? It looks like a typical case for dependency injection. You can define
interface XyzEventHandler
with a
suspend fun processEvent(...)
and then inject its implementation into the place which generates those events. Synchronous and simple.
m

Marc Knaup

01/13/2021, 10:09 AM
Yeah I had something similar. In the end someone emits events and many subscribe to event streams, filter the events they care about and process them. That was so similar to a
MutableSharedFlow
that I've used one instead and can benefit from all Flow API and guarantees (esp. cancelation/unsubscribe). It works great except that it doesn't wait for collectors.
e

elizarov

01/13/2021, 10:10 AM
If you subscibe and filter... It all looks super-fragile to me. How do you actually make sure the event is processed at all (if you really care for it to be processed)?
(E.g., if I injected some kind of
InsuranceQuoteAddedHandler
, then I'm 100% sure that there is a handler that did process added insurance quote, or my dependencies would fail to wire).
m

Marc Knaup

01/13/2021, 10:11 AM
If I filter an event the collection of that one would complete. Event is considered processed. If something has to happen for the event, the downstream wouldn't filter it.
I have multiple handlers per event that don't know about each other. Nor does the emitting code has to know. The emitting side basically says "This is what happened. Whatever you anyone has to do immediately in response, please do now."
e

elizarov

01/13/2021, 10:14 AM
Got it.
Not sure Flow is a good fit for this case at all, since Flow is asynhcronous in nature. And here you basically just need synchronous callbacks.
m

Marc Knaup

01/13/2021, 10:17 AM
Yeah I just hoped that I don't have to reinvent the wheel, esp. managing event stream subscriptions and dispatching in the subscriber's context.
In a regular Flow (e.g. flow{}) emit() waits just fine. Just the Shared one doesn't.
e

elizarov

01/13/2021, 10:19 AM
In a synchronous context "dispatching in the susbciber's context" is moot. Why would you care if it is synchronous with the event publisher anyway?
m

Marc Knaup

01/13/2021, 10:21 AM
The subscribers are typically services with their own scope/lifecycle. I could manually switch to that context on each event. Flows just do that automatically when collecting, if I recall correctly.
e

elizarov

01/13/2021, 10:26 AM
They do, but they asynchronously switch context, loosing the "processing synchrony".
(unless you implement your own Flow operators and avoid using standard context-switching implementation)
m

Marc Knaup

01/13/2021, 10:27 AM
Does that mean that
emit
in
flow {}
would return if a context switch happens but before the collection of the element has completed?
e

elizarov

01/13/2021, 10:27 AM
Yes.
m

Marc Knaup

01/13/2021, 10:28 AM
Interesting. I didn't know that.
e

elizarov

01/13/2021, 10:29 AM
https://kotlinlang.org/docs/reference/coroutines/flow.html#flowon-operator
Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow.
m

Marc Knaup

01/13/2021, 10:30 AM
Why would I need flowOn here? My collection callback is automatically run in the context where I've started the collection, not in the context of the emitter. I mean that context switch.
Anyway, if
Flow
isn’t meant to be used in such cases, there’s no point in making it work. I’ll work on my own synchronous stream solution then.
e

elizarov

01/13/2021, 12:01 PM
Having thought about it more, with this kind of a mindset (that a subscriber decided what and using async operators means it is Ok to signal that the even is processed) it is indeed possible to implement some kind of
SynchronousSharedFlow
implementation (I don't have a better name) that actually suspends
emit
until all active collectors have "finished processing it".
m

Marc Knaup

01/13/2021, 12:04 PM
Is there any way to build that on top of a
MutableSharedFlow
(as a workaround)? How much would a clean implementation differ from a
MutableSharedFlow
? And does it make sense to even support
replay
for such Flows?
e

elizarov

01/13/2021, 12:26 PM
I cannot think of a simple way to be built on top of a MutableSharedFlow.
🆗 1
It seems easier to have a clean-room implementation of such a primitive
m

Marc Knaup

01/13/2021, 12:35 PM
Alright. I’ll try. Thank you for your thoughts and questions.