https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Melih Aksoy

06/05/2019, 8:44 AM
Hey all ! A quality check question - I have produce some data via
flow
in interactors, such as
Copy code
flow {
        emit(Result.State.Loading())
        emit(
            dataSourceFactory.getGrid(
                params.path,
                params.page,
                params.displaySize
            )
        )
        emit(Result.State.Loaded())
}
An interactor is producing it’s own “state”, but I also want to support multiple interactors with single state. I came with an idea to merge them async in this way:
Copy code
fun CoroutineScope.merge(vararg results: Flow<SimpleResult<Any>>) = flow {
    emit(Result.State.Loading())

    val tasks = mutableListOf<Deferred<Unit>>()

    for (result in results) {
        tasks.add(
            async(<http://Dispatchers.IO|Dispatchers.IO>) {
                result
                    .filterNot { it is Result.State }
                    .collect {
                        emit(it)
                    }
            }
        )
    }

    tasks.awaitAll()
    emit(Result.State.Loaded())
}
I’m wondering if this is a robust solution. This
CoroutineScope
is handled properly for closing etc., but what do you think ? Any suggestions to merge flows together while holding ability to ignore some emits ?
e

elizarov

06/05/2019, 2:46 PM
There are many reasons why it will not work. 1. You should not implement
merge
as an extension on the
CoroutineScope
. You should do
flow { coroutineScope { ... } }
2. You are trying to emit concurrently from different async coroutines. This is forbidden and will not work. Flows are sequential.
In the upcoming release you’ll be able to say
channelFlow { ... }
which gives you both a scope to launch your coroutines in and ability to
send(...)
results concurrent.
However, you don’t need any of that. Just do:
Copy code
fun merge(...) = flow {
    emit(Result.State.Loading())
    emitAll(flowOf(results).flattenMapMerge {
        flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .filterNot { it is Result.State }
    }) 
    emit(Result.State.Loaded())
}
m

Melih Aksoy

06/06/2019, 8:37 AM
@elizarov Thanks for the feedback ! Upcoming release will add some nice operators indeed ! Just to know more, when I run the code it works as expected - what kind of problems would
async
cause in a
flow
? PS: To test async I gave my 3 result flows a random delay between 1 - 10 seconds, and they all start at the same time, but emit when their delay was off.
Updated to
Copy code
emit(Result.State.Loading())

    flowOf(* results).flatMapMerge {
        it.flowOn(Dispatchers.Default)
            .filterNot { it is Result.State }
    }.collect {
        emit(it)
    }
It’s doing same thing without need of a
scope
or starting
async
- totally missed
flowOf
. Thanks a lot !
5 Views