joadar
02/05/2022, 4:54 PMvar running: Flow<Pair<Event, String?>> = flow {}
suspend fun execute(node: NodeInterface) {
running = node.run()
}
The first execute()
is working, I collect it from running, but others call to execute()
are not giving anything. How can I make it working correctly? What do I need to use? I tried with shareFlow, tried to collect the node.run()
flow and emit to the running
but nothing is collected ๐ Do I need to use channel instead of flow?
Thanks for your input!joadar
02/05/2022, 5:04 PMnode.run()
and collect all the results to running
keeping the orderjoadar
02/05/2022, 7:08 PMprivate var _running = MutableSharedFlow<Pair<Event, String?>>()
val running: SharedFlow<Pair<Event, String?>> = _running
fun execute(scope: CoroutineScope, node: NodeInterface) {
scope.launch {
node.run().collect {
_running.emit(it)
}
}
}
Please, tell me if there is a better way ๐Nick Allen
02/06/2022, 4:32 AMvar myFlow = someFlow1
launch {
myFlow.collect { ... } // collecting from myFlow, which at this point is someFlow1
}
delay(aBit)
myFlow = someFlow2 //This has no effect on previous invocations like the one above. That was called on the someFlow1 instance.
Essentially, in execute
, you were changing running
(the variable) to point at a new Flow
instance. But collect
was already called on the previous instance, so changing where running
points doesn't do anything. If you called collect
again, then it would use the new instance.Nick Allen
02/06/2022, 4:52 AMcollect
call in execute is not tied at all to the actual consumer's collect
call (the one collecting from running
). So it runs even when nothing is listening, and you are not just getting items from the latest node
, you are getting all items from all nodes because nothing cancels the previous job when execute
is called a second time.
Fortunately, all this is handled by flatMapLatest
which does exactly what you are asking for.
private var latestNode = MutableSharedFlow<NodeInterface>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val running: Flow<Pair<Event, String?>> = latestNode.flatMapLatest { it.run() }
fun execute(node: NodeInterface) {
latestNode.tryEmit(node)
}
On each new item, it'll cancel what it was doing, call the lambda to get a new Flow
, and emit items from it.joadar
02/06/2022, 8:13 AM