Hello, I would like to pass a flow to another flow...
# flow
j
Hello, I would like to pass a flow to another flow and collect the latest one:
Copy code
var running: Flow<Pair<Event, String?>> = flow {}

    suspend fun execute(node: NodeInterface) {
        running = node.run()
    }
The first
execute()
is working, I collect it from running, but others call to
execute()
are not giving anything. How can I make it working correctly? What do I need to use? I tried with shareFlow, tried to collect the
node.run()
flow and emit to the
running
but nothing is collected 😕 Do I need to use channel instead of flow? Thanks for your input!
I guess I want to merge different flow that will be called from
node.run()
and collect all the results to
running
keeping the order
I found a solution like this:
Copy code
private var _running = MutableSharedFlow<Pair<Event, String?>>()
    val running: SharedFlow<Pair<Event, String?>> = _running

    fun execute(scope: CoroutineScope, node: NodeInterface) {
        scope.launch {
            node.run().collect {
                _running.emit(it)
            }
        }
    }
Please, tell me if there is a better way 🙂
n
Consider this code:
Copy code
var myFlow = someFlow1
launch {
    myFlow.collect { ... } // collecting from myFlow, which at this point is someFlow1
}
delay(aBit)
myFlow = someFlow2 //This has no effect on previous invocations like the one above. That was called on the someFlow1 instance.
Essentially, in
execute
, you were changing
running
(the variable) to point at a new
Flow
instance. But
collect
was already called on the previous instance, so changing where
running
points doesn't do anything. If you called
collect
again, then it would use the new instance.
Your second approach has a flaw, where the lifetime of the
collect
call in execute is not tied at all to the actual consumer's
collect
call (the one collecting from
running
). So it runs even when nothing is listening, and you are not just getting items from the latest
node
, you are getting all items from all nodes because nothing cancels the previous job when
execute
is called a second time. Fortunately, all this is handled by
flatMapLatest
which does exactly what you are asking for.
Copy code
private var latestNode = MutableSharedFlow<NodeInterface>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    val running: Flow<Pair<Event, String?>> = latestNode.flatMapLatest { it.run() }

    fun execute(node: NodeInterface) {
        latestNode.tryEmit(node)
    }
On each new item, it'll cancel what it was doing, call the lambda to get a new
Flow
, and emit items from it.
j
Thank you @Nick Allen for your explanations, it’s clear and it makes sense.