Melih Aksoy
06/05/2019, 8:44 AMflow
in interactors, such as
flow {
emit(Result.State.Loading())
emit(
dataSourceFactory.getGrid(
params.path,
params.page,
params.displaySize
)
)
emit(Result.State.Loaded())
}
An interactor is producing it’s own “state”, but I also want to support multiple interactors with single state. I came with an idea to merge them async in this way:
fun CoroutineScope.merge(vararg results: Flow<SimpleResult<Any>>) = flow {
emit(Result.State.Loading())
val tasks = mutableListOf<Deferred<Unit>>()
for (result in results) {
tasks.add(
async(<http://Dispatchers.IO|Dispatchers.IO>) {
result
.filterNot { it is Result.State }
.collect {
emit(it)
}
}
)
}
tasks.awaitAll()
emit(Result.State.Loaded())
}
I’m wondering if this is a robust solution. This CoroutineScope
is handled properly for closing etc., but what do you think ? Any suggestions to merge flows together while holding ability to ignore some emits ?elizarov
06/05/2019, 2:46 PMmerge
as an extension on the CoroutineScope
. You should do flow { coroutineScope { ... } }
2. You are trying to emit concurrently from different async coroutines. This is forbidden and will not work. Flows are sequential.elizarov
06/05/2019, 2:47 PMchannelFlow { ... }
which gives you both a scope to launch your coroutines in and ability to send(...)
results concurrent.elizarov
06/05/2019, 2:50 PMfun merge(...) = flow {
emit(Result.State.Loading())
emitAll(flowOf(results).flattenMapMerge {
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.filterNot { it is Result.State }
})
emit(Result.State.Loaded())
}
Melih Aksoy
06/06/2019, 8:37 AMasync
cause in a flow
?
PS: To test async I gave my 3 result flows a random delay between 1 - 10 seconds, and they all start at the same time, but emit when their delay was off.Melih Aksoy
06/06/2019, 10:21 AMemit(Result.State.Loading())
flowOf(* results).flatMapMerge {
it.flowOn(Dispatchers.Default)
.filterNot { it is Result.State }
}.collect {
emit(it)
}
It’s doing same thing without need of a scope
or starting async
- totally missed flowOf
. Thanks a lot !