joadar
02/05/2022, 4:54 PMvar running: Flow<Pair<Event, String?>> = flow {}
suspend fun execute(node: NodeInterface) {
running = node.run()
}
The first execute() is working, I collect it from running, but others call to execute() are not giving anything. How can I make it working correctly? What do I need to use? I tried with shareFlow, tried to collect the node.run() flow and emit to the running but nothing is collected ๐ Do I need to use channel instead of flow?
Thanks for your input!joadar
02/05/2022, 5:04 PMnode.run() and collect all the results to running keeping the orderjoadar
02/05/2022, 7:08 PMprivate var _running = MutableSharedFlow<Pair<Event, String?>>()
val running: SharedFlow<Pair<Event, String?>> = _running
fun execute(scope: CoroutineScope, node: NodeInterface) {
scope.launch {
node.run().collect {
_running.emit(it)
}
}
}
Please, tell me if there is a better way ๐Nick Allen
02/06/2022, 4:32 AMvar myFlow = someFlow1
launch {
myFlow.collect { ... } // collecting from myFlow, which at this point is someFlow1
}
delay(aBit)
myFlow = someFlow2 //This has no effect on previous invocations like the one above. That was called on the someFlow1 instance.
Essentially, in execute, you were changing running (the variable) to point at a new Flow instance. But collect was already called on the previous instance, so changing where running points doesn't do anything. If you called collect again, then it would use the new instance.Nick Allen
02/06/2022, 4:52 AMcollect call in execute is not tied at all to the actual consumer's collect call (the one collecting from running). So it runs even when nothing is listening, and you are not just getting items from the latest node , you are getting all items from all nodes because nothing cancels the previous job when execute is called a second time.
Fortunately, all this is handled by flatMapLatest which does exactly what you are asking for.
private var latestNode = MutableSharedFlow<NodeInterface>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val running: Flow<Pair<Event, String?>> = latestNode.flatMapLatest { it.run() }
fun execute(node: NodeInterface) {
latestNode.tryEmit(node)
}
On each new item, it'll cancel what it was doing, call the lambda to get a new Flow , and emit items from it.joadar
02/06/2022, 8:13 AM