https://kotlinlang.org logo
Title
a

altavir

04/26/2019, 11:25 AM
Another question. It is probably somewhere in the documentation, but is the
Flow
map operation parallelizable?
map is sequential
a

altavir

04/26/2019, 11:40 AM
for 1->1 map I will need to create single-element flows, which is not quite convenient, but thanks, I will look into the source. I was thinking about using regular map to generate
Deferred
and then await them in next map
Something like this:
fun <T,R> Flow<T>.mapParallel(scope: CoroutineScope, transform: suspend (T)->R) = map{
    scope.async { transform(it) }
}.map{
    it.await()
}
Probably won't work since intermediate operations are lazy...
g

gildor

04/26/2019, 3:02 PM
I will need to create single-element flows
There are extension functions for functional types (including suspend) which create flow from lambda/function
a

altavir

04/26/2019, 3:04 PM
There still is a problem of structured concurrency and dispatcher.
flatMapMerge
does not provide control over scope and dispatcher, so it could give a work around at best. I think this is rather common task so it should have dedicated solution.
e

elizarov

04/26/2019, 6:10 PM
If all you need is to map, you can do:
.flowWith(Dispatchers.Default) { // BG!
    flatMapMerge {
        flowOf(slowOperation(it))
    }
}
a

altavir

04/26/2019, 6:13 PM
OK, I think I understand. But it is not obvious. I think it still needs a separate functions.
@elizarov Your solution does not work. I've just placed in my test and it evaluates sequentially, not in parallel.
I think it is because dispatcher does not actually change and there is a little clause in
flowWith
that skips channel generation in case context is the same
Nope, even if I start with IO or Unconfined, it still does not work. In order to get parallel execution in coroutines, one need to first generate a list of
async
, store them and then
await
them. In my solution channel plays the role of interediate storage, but I do not think any solution with
flowOn
would generate this intermediate state because, channel there is consumed immediately.
e

elizarov

04/26/2019, 6:57 PM
Yes, you are right. Transforms are computed sequentially.
s

sdeleuze

04/28/2019, 9:03 AM
Don’t you think there is a missing operator between sequential
map
and
flatMapMerge/Concat
which forces to use artificially a single element
Flow
to achieve parrallel execution? Processing each element of a
Flow
with slow operations (remote http call for each element) is very common.
Also discouraging people to use
flatMap
in the doc seems misleading to me, I think most people want parrallel execution when calling suspending function to transform element of a
Flow
1
Or maybe
map
could do parrallel transformation when used with suspending functions?
The idea is first to transform flow into flow of lazy value (I can't use lazy deferred due to structured concurrency leak). Then I trigger start for required number of parallel lazy computations when I do collect (new collect method requires explicit number of started tasks).
g

gildor

04/28/2019, 3:19 PM
I agree with Sebastien, map is sequential in Rx just because it doesn't support async operation there by definition, but with Coroutines it's possible to make
map
parallel, even do this optional (add concurrency param with default value 1), so it will use fast path by default (current implementation) and optional easy to use concurrency
s

sdeleuze

04/28/2019, 3:25 PM
I like the idea of a
map
optional parameter, and I think I would go as far as using same concurency value that
flatMap
. Coroutines are cheap,
Flow
is designed to provide a powerful
map
operator where people will expect parrallel execution for suspending functions.
I guess the related issue where we should continue the discussion is https://github.com/Kotlin/kotlinx.coroutines/issues/1147