adeln
07/31/2019, 7:50 AMfun <T> List<Deferred<T>>.toFlow(): Flow<T>
I want the flow to emit values as soon as they become availableuli
07/31/2019, 7:52 AMadeln
07/31/2019, 7:53 AMmarstran
07/31/2019, 8:01 AMfun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
forEach { emit(it.await()) }
}
Emitting items in order of completion is harder. You need to keep track of the deferreds that have already completed, so that you can complete the flow at the right time.gildor
07/31/2019, 8:04 AMgildor
07/31/2019, 8:04 AMmarstran
07/31/2019, 8:05 AMuli
07/31/2019, 8:05 AMflatMapMerge
can help? But select might be more efficient depending on the size of the List
uli
07/31/2019, 8:06 AMgildor
07/31/2019, 8:06 AMmarstran
07/31/2019, 8:15 AMfun <T> List<Deferred<T>>.toFlow(): Flow<T> =
asFlow()
.flatMapMerge { flow { emit(it.await()) } }
adeln
07/31/2019, 8:16 AMadeln
07/31/2019, 8:20 AMgildor
07/31/2019, 8:30 AMfun <T> List<Deferred<T>>.toFlow(): Flow<T> = flow {
forEach { deferred ->
select {
deferred.onAwait { value ->
emit(value)
}
}
}
}
adeln
07/31/2019, 8:32 AMwhileSelect
gildor
07/31/2019, 8:33 AMgildor
07/31/2019, 8:34 AMadeln
07/31/2019, 8:34 AMadeln
07/31/2019, 8:36 AMList<Deferred>
is already hot, toFlow()
doesn’t make much sense. Maybe I needed toChannel(): ReceiveChannel
or maybe I need to reconsider the way I launch my parallel computationsgildor
07/31/2019, 8:37 AMgildor
07/31/2019, 8:37 AMgildor
07/31/2019, 8:37 AMgildor
07/31/2019, 8:38 AMadeln
07/31/2019, 8:48 AMgildor
07/31/2019, 8:50 AMwebSocket.send(it)
after hitNetwork
adeln
07/31/2019, 8:53 AMadeln
07/31/2019, 8:54 AMlaunch {}
inside flow {}
=/gildor
07/31/2019, 8:55 AMBut I can’t just callYou can, useinsidelaunch {}
flow {}
channelFlow
gildor
07/31/2019, 8:56 AMwith flow I can debug it by just printing to consoleWhy you cannot do this with:
val result = hitNetwork()
println("Result!!! $result")
webSocket.send(result)
println("finished")
gildor
07/31/2019, 8:56 AMgildor
07/31/2019, 8:58 AMinput: List<Input>
but input: Flow<Input>
, but if you already have input and all processing happeing on the same function, why would you need flowgildor
07/31/2019, 9:01 AMinput // List<Input>
.asFlow()
.map {
async { hitNetwork(it) }
}
.map {
webSocket.send(it.await())
}
.collect()
gildor
07/31/2019, 9:02 AMcoroutineScope {
input.forEach {
launch {
val result = hitNetwork(it)
webSocket.send(result)
}
}
}
adeln
07/31/2019, 9:05 AMchannelFlow
is exactly what I neededadeln
07/31/2019, 9:08 AMgildor
07/31/2019, 9:09 AMIn debug I want to filter between success/errorWhy not just use
if
for that, why do you need flow?gildor
07/31/2019, 9:10 AMadeln
07/31/2019, 9:13 AMgildor
07/31/2019, 9:14 AMadeln
07/31/2019, 9:27 AMfilter
doesn’t created another level of nesting, and is easy to comment out
same with take(n)
, easy to stop the execution of the program when you’re just playing with itgildor
07/31/2019, 9:28 AMadeln
07/31/2019, 9:28 AMFlow
when I have completely figured out all the small little details of my problem