Given a SharedFlow, what is an appropriate way to ...
# flow
m
Given a SharedFlow, what is an appropriate way to register as a subscriber to its events at a specific point in the control flow, to avoid missing anything that happens after that point? I.e. I need to buffer events from that point in time until somebody collects them. Here's my use case: I'm implementing a GraphQL subscription to notify about changes to the data. Internally change events are posted to the shared flow, but I also have a table of historical change events. To avoid race conditions the GraphQL client provides the latest logical time that it knows about, and the API either immediately returns known subsequent events from the history table, or waits for the next SharedFlow event and returns that. So basically I need to 1. start listening on the flow, 2. check the history table, 3. if the history table has data return that, otherwise return the flow. My current solution looks something like this:
Copy code
val deferred: Deferred<List<IntegerObjectId>> = GlobalScope.async {
    changedIdsFlow.first()
}

// Simulate an interval where simultaneous changes to data may or may not occur
Thread.sleep(10000)

val publisher = mono {
    deferred.await()
}
// Here we would return new history records if they exist, otherwise we return the flow wrapped as a publisher.
return DataFetcherResult.Builder(publisher as Publisher<List<IntegerObjectId>>).build()
This doesn't seem to me like the ideal solution though. Another way I found of doing it was to convert the flow to a reactive publisher and pass it through a dummy reactive subscriber, just to cause it to start collection (hence registering as a subscriber in the SharedFlow).