Another question. It is probably somewhere in the ...
# coroutines
a
Another question. It is probably somewhere in the documentation, but is the
Flow
map operation parallelizable?
map is sequential
a
for 1->1 map I will need to create single-element flows, which is not quite convenient, but thanks, I will look into the source. I was thinking about using regular map to generate
Deferred
and then await them in next map
Something like this:
Copy code
fun <T,R> Flow<T>.mapParallel(scope: CoroutineScope, transform: suspend (T)->R) = map{
    scope.async { transform(it) }
}.map{
    it.await()
}
Probably won't work since intermediate operations are lazy...
g
I will need to create single-element flows
There are extension functions for functional types (including suspend) which create flow from lambda/function
a
There still is a problem of structured concurrency and dispatcher.
flatMapMerge
does not provide control over scope and dispatcher, so it could give a work around at best. I think this is rather common task so it should have dedicated solution.
e
If all you need is to map, you can do:
Copy code
.flowWith(Dispatchers.Default) { // BG!
    flatMapMerge {
        flowOf(slowOperation(it))
    }
}
a
OK, I think I understand. But it is not obvious. I think it still needs a separate functions.
@elizarov Your solution does not work. I've just placed in my test and it evaluates sequentially, not in parallel.
I think it is because dispatcher does not actually change and there is a little clause in
flowWith
that skips channel generation in case context is the same
Nope, even if I start with IO or Unconfined, it still does not work. In order to get parallel execution in coroutines, one need to first generate a list of
async
, store them and then
await
them. In my solution channel plays the role of interediate storage, but I do not think any solution with
flowOn
would generate this intermediate state because, channel there is consumed immediately.
e
Yes, you are right. Transforms are computed sequentially.
s
Also discouraging people to use
flatMap
in the doc seems misleading to me, I think most people want parrallel execution when calling suspending function to transform element of a
Flow
1
Or maybe
map
could do parrallel transformation when used with suspending functions?
The idea is first to transform flow into flow of lazy value (I can't use lazy deferred due to structured concurrency leak). Then I trigger start for required number of parallel lazy computations when I do collect (new collect method requires explicit number of started tasks).
g
I agree with Sebastien, map is sequential in Rx just because it doesn't support async operation there by definition, but with Coroutines it's possible to make
map
parallel, even do this optional (add concurrency param with default value 1), so it will use fast path by default (current implementation) and optional easy to use concurrency
s
I like the idea of a
map
optional parameter, and I think I would go as far as using same concurency value that
flatMap
. Coroutines are cheap,
Flow
is designed to provide a powerful
map
operator where people will expect parrallel execution for suspending functions.
I guess the related issue where we should continue the discussion is https://github.com/Kotlin/kotlinx.coroutines/issues/1147