what is an idiomatic way to issue a list of io-blo...
# coroutines
c
what is an idiomatic way to issue a list of io-blocking calls and do something with each result as soon as it completes?
f
you could wrap each in an
async
block then call
awaitAll
on the list
m
Copy code
suspend fun doMultipleIO() {
    coroutineScope {
        async { doSomething(io1())
        async { doSomething(io2())
    }
}
fun suspend io1() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}

fun suspend io2() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}
f
I read
list
literally, as in
Copy code
val myList = listOf(
   async { doIo1() },
   async { doIo2() },
   ...
)
then
Copy code
myList.awaitAll()
c
thank you for the suggestions. I think both of those don't do what I'm hoping. let me share an example
Copy code
fun main() = runBlocking<Unit> {
    val clients = ('a'..'z').map { Client(it.toString()) }
    orderedResponses(clients).collect {
        println(it)
    }
}

// doesn't work
suspend fun eagerResponses(clients: List<Client>): Flow<String> = coroutineScope {
    flow {
        clients.forEach {
            launch {
                emit(it.fakeRpc())
            }
        }
    }
}

fun orderedResponses(clients: List<Client>): Flow<String> =
    clients.asFlow().map {
        it.fakeRpc()
    }

class Client(val name: String) {
    // Waits 0 or 5 seconds before returning, simulating bimodal RPC latency.
    suspend fun fakeRpc() = (0..99).random().let { delay((5 * (it % 2)).seconds); "$name $it" }
}
in
main
I want the results to each result to print as soon as it is complete, rather than waiting for all to be done
f
then do what mkrussel suggested, wrap each call in
async
m
Your eager is broken because the flow block completes before anything is emitted so the flow is closed. What you would need to do instead is to move the
coroutineScope
inside the
flow
lambda.
Make sure the call to
emit
happens in the same dispatcher as the call to
flow
c
@Francesc I can use this in main:
clients.forEach { launch { println(it.fakeRpc())} }
but I'd like something that returns a sequence or flow of the results (testability etc). @mkrussel I did indeed see
IllegalStateException: Flow invariant is violated:
right away. I'll try to figure out using the right dispatcher
m
I don’t think your error was from the dispatcher (your snippet doesn’t change threads), but instead because you have the call to
coroutineScope
in the wrong place. You need the lambda for the
flow
to suspend until all the calls to
launch
have completed.
f
if you want a result at the end, you can do the list as I showed above, you will get all the results in your list at the end
c
I want to stream the results to the caller as they complete
m
Once you do add IO, I would put the switch to different dispatcher in the function that performs IO and then the result will be processed in the original dispatcher
f
what about this?
Copy code
suspend fun io1(): Int {
    // TODO
}
suspend fun io2(): Int {
    // TODO
}

fun sample {
    merge(
        flow { emit(io1()) },
        flow { emit(io2()) },
    ).onEach { 
        // handle result
    }
        .launchIn(scope)
}
c
The runtime error message led me to channelFlow, which seems to work?
Copy code
fun eagerResponses(clients: List<Client>): Flow<String> = channelFlow {
    clients.forEach {
        launch { send(it.fakeRpc()) }
    }
}
m
You would manually need to take care of closing the channel so that the flow gets closed.
If you are having problems with using the wrong dispatcher, so the code for that is creating that failure.
c
@Francesc this version of your suggestion seems to also work
Copy code
fun mergedResponses(clients: List<Client>): Flow<String> {
    val flowList = clients.map { client -> flow { emit(client.fakeRpc()) } }
    return merge(*(flowList.toTypedArray()))
}
@mkrussel I'm reading about ChannelFlow now to learn about the closing requirement
re: closing, I see that I can use
awaitClose{}
in`channelFlow{}`to keep it alive and to clean up resources used in sending updates when it's closed by the caller. in my case, once all the launched jobs have completed then I want it to close, because all RPCs for this operation have completed. I think?
f
no, you are using
launch
so if you close if after that you will be cancelling the requests
c
I understand that to mean that if I manually close the channel before some jobs are completed then the incomplete jobs will be cancelled. that sounds useful to me. i could use a single timeout in the caller to stop waiting for more responses.
f
you need to wait for the coroutines you launch to complete. You could wrap in a
coroutineScope
for instance
m
If you have the merge solution working, I would go with that. There are many ways to solve this, and I would say using
channelFlow
is probably the most complicated.
c
point taken. I'd like to understand the issue if I can. in order to collect the flow containing the responses, the caller must already be in a coroutineScope, right? I get this error when I try to collect eagerResponses() without a coroutine scope, at least: "Suspend function 'collect' should be called only from a coroutine or another suspend function"