https://kotlinlang.org logo
#coroutines
Title
# coroutines
c

Curtis Ullerich

08/16/2022, 5:08 PM
what is an idiomatic way to issue a list of io-blocking calls and do something with each result as soon as it completes?
f

Francesc

08/16/2022, 5:24 PM
you could wrap each in an
async
block then call
awaitAll
on the list
m

mkrussel

08/16/2022, 5:29 PM
Copy code
suspend fun doMultipleIO() {
    coroutineScope {
        async { doSomething(io1())
        async { doSomething(io2())
    }
}
fun suspend io1() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}

fun suspend io2() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
}
f

Francesc

08/16/2022, 5:31 PM
I read
list
literally, as in
Copy code
val myList = listOf(
   async { doIo1() },
   async { doIo2() },
   ...
)
then
Copy code
myList.awaitAll()
c

Curtis Ullerich

08/16/2022, 5:31 PM
thank you for the suggestions. I think both of those don't do what I'm hoping. let me share an example
Copy code
fun main() = runBlocking<Unit> {
    val clients = ('a'..'z').map { Client(it.toString()) }
    orderedResponses(clients).collect {
        println(it)
    }
}

// doesn't work
suspend fun eagerResponses(clients: List<Client>): Flow<String> = coroutineScope {
    flow {
        clients.forEach {
            launch {
                emit(it.fakeRpc())
            }
        }
    }
}

fun orderedResponses(clients: List<Client>): Flow<String> =
    clients.asFlow().map {
        it.fakeRpc()
    }

class Client(val name: String) {
    // Waits 0 or 5 seconds before returning, simulating bimodal RPC latency.
    suspend fun fakeRpc() = (0..99).random().let { delay((5 * (it % 2)).seconds); "$name $it" }
}
in
main
I want the results to each result to print as soon as it is complete, rather than waiting for all to be done
f

Francesc

08/16/2022, 5:34 PM
then do what mkrussel suggested, wrap each call in
async
m

mkrussel

08/16/2022, 5:36 PM
Your eager is broken because the flow block completes before anything is emitted so the flow is closed. What you would need to do instead is to move the
coroutineScope
inside the
flow
lambda.
Make sure the call to
emit
happens in the same dispatcher as the call to
flow
c

Curtis Ullerich

08/16/2022, 5:42 PM
@Francesc I can use this in main:
clients.forEach { launch { println(it.fakeRpc())} }
but I'd like something that returns a sequence or flow of the results (testability etc). @mkrussel I did indeed see
IllegalStateException: Flow invariant is violated:
right away. I'll try to figure out using the right dispatcher
m

mkrussel

08/16/2022, 5:45 PM
I don’t think your error was from the dispatcher (your snippet doesn’t change threads), but instead because you have the call to
coroutineScope
in the wrong place. You need the lambda for the
flow
to suspend until all the calls to
launch
have completed.
f

Francesc

08/16/2022, 5:45 PM
if you want a result at the end, you can do the list as I showed above, you will get all the results in your list at the end
c

Curtis Ullerich

08/16/2022, 5:45 PM
I want to stream the results to the caller as they complete
m

mkrussel

08/16/2022, 5:46 PM
Once you do add IO, I would put the switch to different dispatcher in the function that performs IO and then the result will be processed in the original dispatcher
f

Francesc

08/16/2022, 5:48 PM
what about this?
Copy code
suspend fun io1(): Int {
    // TODO
}
suspend fun io2(): Int {
    // TODO
}

fun sample {
    merge(
        flow { emit(io1()) },
        flow { emit(io2()) },
    ).onEach { 
        // handle result
    }
        .launchIn(scope)
}
c

Curtis Ullerich

08/16/2022, 5:54 PM
The runtime error message led me to channelFlow, which seems to work?
Copy code
fun eagerResponses(clients: List<Client>): Flow<String> = channelFlow {
    clients.forEach {
        launch { send(it.fakeRpc()) }
    }
}
m

mkrussel

08/16/2022, 5:58 PM
You would manually need to take care of closing the channel so that the flow gets closed.
If you are having problems with using the wrong dispatcher, so the code for that is creating that failure.
c

Curtis Ullerich

08/16/2022, 6:05 PM
@Francesc this version of your suggestion seems to also work
Copy code
fun mergedResponses(clients: List<Client>): Flow<String> {
    val flowList = clients.map { client -> flow { emit(client.fakeRpc()) } }
    return merge(*(flowList.toTypedArray()))
}
@mkrussel I'm reading about ChannelFlow now to learn about the closing requirement
re: closing, I see that I can use
awaitClose{}
in`channelFlow{}`to keep it alive and to clean up resources used in sending updates when it's closed by the caller. in my case, once all the launched jobs have completed then I want it to close, because all RPCs for this operation have completed. I think?
f

Francesc

08/16/2022, 6:18 PM
no, you are using
launch
so if you close if after that you will be cancelling the requests
c

Curtis Ullerich

08/16/2022, 6:25 PM
I understand that to mean that if I manually close the channel before some jobs are completed then the incomplete jobs will be cancelled. that sounds useful to me. i could use a single timeout in the caller to stop waiting for more responses.
f

Francesc

08/16/2022, 6:26 PM
you need to wait for the coroutines you launch to complete. You could wrap in a
coroutineScope
for instance
m

mkrussel

08/16/2022, 6:32 PM
If you have the merge solution working, I would go with that. There are many ways to solve this, and I would say using
channelFlow
is probably the most complicated.
c

Curtis Ullerich

08/16/2022, 6:51 PM
point taken. I'd like to understand the issue if I can. in order to collect the flow containing the responses, the caller must already be in a coroutineScope, right? I get this error when I try to collect eagerResponses() without a coroutine scope, at least: "Suspend function 'collect' should be called only from a coroutine or another suspend function"
3 Views