Hello there! I’m looking for a right operator to ...
# coroutines
u
Hello there! I’m looking for a right operator to implement the following logic: I need to make a request to load an object. Basically, it’s a class with some fields. Then I need to subscribe to the changes relative to this object. These are granular changes — transformations applied to some of the fields of this object.
Copy code
fun loadObject(id: Id) : Flow<Object>
fun subscribe(id: Id) : Flow<List<SubscriptionEvent>>
First, I tried to use
combine
operator in the following manner:
Copy code
loadObject(id).combine(subscribe(id)) {  obj, events ->
        var result = obj
        events.forEach { event ->
            result = when (event) {
                is Init -> {
                    obj
                }
                is Amend -> {
                    result.amend(event.diff)
                }
                is Set -> {
                    event.data
                }
                is SubscriptionEvent.Unset -> {
                    result.unset(event.keys)
                }
            }
        }
        result
    }
But this doesn’t work, obviously, since in the flow, there is always initial object (emitted only once after loading), which is combined with latest events / transformations, whereas what I need is something like
scan
operator, where I could first initialize (load) my object, then start consuming events, which lead to the transformation of this object. Observers will receive something like: “object in initial state, after loaded”, “object with the first sets of transformations applied”, “object with the first and second set of transformations applied”. Is there something like:
Copy code
transformations.scan(initialObjectFlow) { latestVersionOfTheObject, events -> 
// apply new events / transformations
// emit new version of the object 
 }
Thank you!
b
smth like this should work
Copy code
loadObject(id).flatMapLatest { object ->
   subscribe(id).scan(object) { prev, transform -> 
      prev.applyTransform(transform)  
   }
}
but technically you can miss some transforms when a new object from loadObject emitted
u
thanks, in my case, loadObject is basically one-shot async operation
but it is important to process every transform
e
flatMapLatest doesn't skip anything inside the transform, it just terminates and starts a new inner flow if upstream emits, which it sounds like doesn't happen in your case. but you might use flatMapConcat which won't collect another upstream emit until the inner flow completes, or flatMapMerge which will collect multiple inner flows in parallel of upstream emits before the previous is complete
as an aside, one-shot async operations are usually better modeled as
suspend fun
than
Flow
, and then none of these concerns would arise
1
u
Thanks, @ephemient. But what if the client code wants to consume a flow? In this case, we wait for suspend function to execute, then we subscribe, it seems a bit weird to me, as if mixing cold and hot streams, so to say. But maybe I am wrong.
e
Copy code
flow {
    val obj = loadObject()
    emitAll(subscribe(id)...)
}
👍 1
u
thanks so much!
j
what if the client code wants to consume a flow?
@ubu Why would they want that if only one value is expected anyway? Do you have an example of use case where the client knows there is a single value to get in an async way, and yet still wants to consume it as a flow? IMO those flow APIs with single item are mistakes and misdocument the methods