hey folks! I have a list of parallel computations....
# coroutines
a
hey folks! I have a list of parallel computations. How would I write a function like this? Can’t really wrap my head around it
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T>
I want the flow to emit values as soon as they become available
u
do you want to process in order and/or preserve order?
a
I want the flow to emit values as soon as they become available
m
It's easy if you await them in the order of the list:
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
    forEach { emit(it.await()) }
}
Emitting items in order of completion is harder. You need to keep track of the deferreds that have already completed, so that you can complete the flow at the right time.
g
You can use select
@marstranThis is not “I want the flow to emit values as soon as they become availab”
m
Yes, I know. That's what I wrote after the example.
u
Maybe
flatMapMerge
can help? But select might be more efficient depending on the size of the
List
g
Just use select to get values whey they are ready
m
Maybe this (after @uli's suggestion):
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> =  
    asFlow()
      .flatMapMerge { flow { emit(it.await()) } }
a
it works, thanks! I’m wondering if there are more efficient ways of doing this
@gildor yeah, I can’t really understand how to use select to skip already emitted values. It just keeps emitting the same value
g
@adeln
Copy code
fun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
    forEach { deferred ->
        select {
            deferred.onAwait { value ->
                emit(value)
            }
        }
    }
}
a
it works, thank you so much! Didn’t know about
whileSelect
g
Actually in this case you don’t need whileSelect, just select is enough I updated sample
actually it’s good candidate for kotlinx.coroutines, probably make sense to report a feature request
a
it works! 🙂 thank you
I don’t know about kotlinx.coroutines. Since
List<Deferred>
is already hot,
toFlow()
doesn’t make much sense. Maybe I needed
toChannel(): ReceiveChannel
or maybe I need to reconsider the way I launch my parallel computations
g
No, I don’t think so
Flow is fine even for hot sources
With channel you can of course cancel all those deferred, but I wouldn’t explose it as API
what is your use case there?
a
g
I don’t see why do you need flow here Why not just call
webSocket.send(it)
after
hitNetwork
a
with flow I can debug it by just printing to console flow is also easier to test than faking the websocket
I guess I’m looking for a way to produce the flow from parallel computations. But I can’t just call
launch {}
inside
flow {}
=/
g
But I can’t just call
launch {}
inside
flow {}
You can, use
channelFlow
with flow I can debug it by just printing to console
Why you cannot do this with:
Copy code
val result = hitNetwork()
println("Result!!! $result")
webSocket.send(result)
println("finished")
But I really don’t see why you need flow in this sample
It may be required if you would have not
input: List<Input>
but
input: Flow<Input>
, but if you already have input and all processing happeing on the same function, why would you need flow
if you have list you even can do this:
Copy code
input // List<Input>
            .asFlow()
            .map {
                async { hitNetwork(it) }
            }
            .map {
                webSocket.send(it.await())
            }
            .collect()
But honestly I see no reason for this, because I can just do:
Copy code
coroutineScope {
 input.forEach {
    launch {
        val result = hitNetwork(it)
        webSocket.send(result)
    }
  }
}
a
thanks!
channelFlow
is exactly what I needed
@gildor In debug I want to filter between success/error, but in prod I want all the values. Flow is just easier to manipulate
g
In debug I want to filter between success/error
Why not just use
if
for that, why do you need flow?
I mean it may be needed if you return it, now you just do some work and return
a
I completely understand that in my use case the producer and the consumer may live in the same function. But I want to separate them
g
than probably you need something completely different, but hard to say without sample
a
flow is just easier to play with:
filter
doesn’t created another level of nesting, and is easy to comment out same with
take(n)
, easy to stop the execution of the program when you’re just playing with it
g
¯\_(ツ)_/¯
a
I completely agree that I wouldn’t need
Flow
when I have completely figured out all the small little details of my problem