https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

adeln

07/31/2019, 7:50 AM
hey folks! I have a list of parallel computations. How would I write a function like this? Can’t really wrap my head around it
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T>
I want the flow to emit values as soon as they become available
u

uli

07/31/2019, 7:52 AM
do you want to process in order and/or preserve order?
a

adeln

07/31/2019, 7:53 AM
I want the flow to emit values as soon as they become available
m

marstran

07/31/2019, 8:01 AM
It's easy if you await them in the order of the list:
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
    forEach { emit(it.await()) }
}
Emitting items in order of completion is harder. You need to keep track of the deferreds that have already completed, so that you can complete the flow at the right time.
g

gildor

07/31/2019, 8:04 AM
You can use select
@marstranThis is not “I want the flow to emit values as soon as they become availab”
m

marstran

07/31/2019, 8:05 AM
Yes, I know. That's what I wrote after the example.
u

uli

07/31/2019, 8:05 AM
Maybe
flatMapMerge
can help? But select might be more efficient depending on the size of the
List
g

gildor

07/31/2019, 8:06 AM
Just use select to get values whey they are ready
m

marstran

07/31/2019, 8:15 AM
Maybe this (after @uli's suggestion):
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> =  
    asFlow()
      .flatMapMerge { flow { emit(it.await()) } }
a

adeln

07/31/2019, 8:16 AM
it works, thanks! I’m wondering if there are more efficient ways of doing this
@gildor yeah, I can’t really understand how to use select to skip already emitted values. It just keeps emitting the same value
g

gildor

07/31/2019, 8:30 AM
@adeln
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
    forEach { deferred ->
        select {
            deferred.onAwait { value ->
                emit(value)
            }
        }
    }
}
a

adeln

07/31/2019, 8:32 AM
it works, thank you so much! Didn’t know about
whileSelect
g

gildor

07/31/2019, 8:33 AM
Actually in this case you don’t need whileSelect, just select is enough I updated sample
actually it’s good candidate for kotlinx.coroutines, probably make sense to report a feature request
a

adeln

07/31/2019, 8:34 AM
it works! 🙂 thank you
I don’t know about kotlinx.coroutines. Since
List<Deferred>
is already hot,
toFlow()
doesn’t make much sense. Maybe I needed
toChannel(): ReceiveChannel
or maybe I need to reconsider the way I launch my parallel computations
g

gildor

07/31/2019, 8:37 AM
No, I don’t think so
Flow is fine even for hot sources
With channel you can of course cancel all those deferred, but I wouldn’t explose it as API
what is your use case there?
a

adeln

07/31/2019, 8:48 AM
g

gildor

07/31/2019, 8:50 AM
I don’t see why do you need flow here Why not just call
webSocket.send(it)
after
hitNetwork
a

adeln

07/31/2019, 8:53 AM
with flow I can debug it by just printing to console flow is also easier to test than faking the websocket
I guess I’m looking for a way to produce the flow from parallel computations. But I can’t just call
launch {}
inside
flow {}
=/
g

gildor

07/31/2019, 8:55 AM
But I can’t just call
launch {}
inside
flow {}
You can, use
channelFlow
with flow I can debug it by just printing to console
Why you cannot do this with:
Copy code
val result = hitNetwork()
println("Result!!! $result")
webSocket.send(result)
println("finished")
But I really don’t see why you need flow in this sample
It may be required if you would have not
input: List<Input>
but
input: Flow<Input>
, but if you already have input and all processing happeing on the same function, why would you need flow
if you have list you even can do this:
Copy code
input // List<Input>
            .asFlow()
            .map {
                async { hitNetwork(it) }
            }
            .map {
                webSocket.send(it.await())
            }
            .collect()
But honestly I see no reason for this, because I can just do:
Copy code
coroutineScope {
 input.forEach {
    launch {
        val result = hitNetwork(it)
        webSocket.send(result)
    }
  }
}
a

adeln

07/31/2019, 9:05 AM
thanks!
channelFlow
is exactly what I needed
@gildor In debug I want to filter between success/error, but in prod I want all the values. Flow is just easier to manipulate
g

gildor

07/31/2019, 9:09 AM
In debug I want to filter between success/error
Why not just use
if
for that, why do you need flow?
I mean it may be needed if you return it, now you just do some work and return
a

adeln

07/31/2019, 9:13 AM
I completely understand that in my use case the producer and the consumer may live in the same function. But I want to separate them
g

gildor

07/31/2019, 9:14 AM
than probably you need something completely different, but hard to say without sample
a

adeln

07/31/2019, 9:27 AM
flow is just easier to play with:
filter
doesn’t created another level of nesting, and is easy to comment out same with
take(n)
, easy to stop the execution of the program when you’re just playing with it
g

gildor

07/31/2019, 9:28 AM
¯\_(ツ)_/¯
a

adeln

07/31/2019, 9:28 AM
I completely agree that I wouldn’t need
Flow
when I have completely figured out all the small little details of my problem
3 Views