Curtis Ullerich
08/16/2022, 5:08 PMFrancesc
08/16/2022, 5:24 PMasync
block then call awaitAll
on the listmkrussel
08/16/2022, 5:29 PMsuspend fun doMultipleIO() {
coroutineScope {
async { doSomething(io1())
async { doSomething(io2())
}
}
fun suspend io1() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}
fun suspend io2() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}
Francesc
08/16/2022, 5:31 PMlist
literally, as in
val myList = listOf(
async { doIo1() },
async { doIo2() },
...
)
then
myList.awaitAll()
Curtis Ullerich
08/16/2022, 5:31 PMfun main() = runBlocking<Unit> {
val clients = ('a'..'z').map { Client(it.toString()) }
orderedResponses(clients).collect {
println(it)
}
}
// doesn't work
suspend fun eagerResponses(clients: List<Client>): Flow<String> = coroutineScope {
flow {
clients.forEach {
launch {
emit(it.fakeRpc())
}
}
}
}
fun orderedResponses(clients: List<Client>): Flow<String> =
clients.asFlow().map {
it.fakeRpc()
}
class Client(val name: String) {
// Waits 0 or 5 seconds before returning, simulating bimodal RPC latency.
suspend fun fakeRpc() = (0..99).random().let { delay((5 * (it % 2)).seconds); "$name $it" }
}
main
I want the results to each result to print as soon as it is complete, rather than waiting for all to be doneFrancesc
08/16/2022, 5:34 PMasync
mkrussel
08/16/2022, 5:36 PMcoroutineScope
inside the flow
lambda.emit
happens in the same dispatcher as the call to flow
Curtis Ullerich
08/16/2022, 5:42 PMclients.forEach { launch { println(it.fakeRpc())} }
but I'd like something that returns a sequence or flow of the results (testability etc). @mkrussel I did indeed see IllegalStateException: Flow invariant is violated:
right away. I'll try to figure out using the right dispatchermkrussel
08/16/2022, 5:45 PMcoroutineScope
in the wrong place. You need the lambda for the flow
to suspend until all the calls to launch
have completed.Francesc
08/16/2022, 5:45 PMCurtis Ullerich
08/16/2022, 5:45 PMmkrussel
08/16/2022, 5:46 PMFrancesc
08/16/2022, 5:48 PMsuspend fun io1(): Int {
// TODO
}
suspend fun io2(): Int {
// TODO
}
fun sample {
merge(
flow { emit(io1()) },
flow { emit(io2()) },
).onEach {
// handle result
}
.launchIn(scope)
}
Curtis Ullerich
08/16/2022, 5:54 PMfun eagerResponses(clients: List<Client>): Flow<String> = channelFlow {
clients.forEach {
launch { send(it.fakeRpc()) }
}
}
mkrussel
08/16/2022, 5:58 PMCurtis Ullerich
08/16/2022, 6:05 PMfun mergedResponses(clients: List<Client>): Flow<String> {
val flowList = clients.map { client -> flow { emit(client.fakeRpc()) } }
return merge(*(flowList.toTypedArray()))
}
awaitClose{}
in`channelFlow{}`to keep it alive and to clean up resources used in sending updates when it's closed by the caller. in my case, once all the launched jobs have completed then I want it to close, because all RPCs for this operation have completed. I think?Francesc
08/16/2022, 6:18 PMlaunch
so if you close if after that you will be cancelling the requestsCurtis Ullerich
08/16/2022, 6:25 PMFrancesc
08/16/2022, 6:26 PMcoroutineScope
for instancemkrussel
08/16/2022, 6:32 PMchannelFlow
is probably the most complicated.Curtis Ullerich
08/16/2022, 6:51 PM