Hi all, I was trying to find an easy implementatio...
# coroutines
a
Hi all, I was trying to find an easy implementation to launch multiple api calls in an async block but instead of waiting for all of them, just wait for the first n that return, cancelling the others. I know I can do something like
Copy code
val results = endpoints.map {
            async {
                client.call(it)
            }
        }
results.awaitAll()
but of course that will wait for all the calls to come back, not just the first N. I know I could solve this problem by using channels and subscribe n topics waiting for them, but I was wondering if there is a counterpart with async/await as well? Also, as a side note, one thing I’ve noticed is that if the
client
is blocking (imagine a Thread.sleep) the async is called sequentially and I lose completely the parallelism..what am I doing wrong?
e
• nothing built in, sorry. you can take inspiration from https://github.com/Kotlin/kotlinx.coroutines/issues/2867 • the dispatcher in
runBlocking
is a single threaded event loop. use another dispatcher to dispatch onto other threads (e.g.
async(<http://Dispatchers.IO|Dispatchers.IO>) { ... }
a
thanks @ephemient Uli suggested to use select that should probably do the work? as per the runBlocking I guess you are right, I was using a TestCoroutineDispatcher that I was passing to the async block (maybe that’s a single threaded?). I wonder that’s the reason why is sequential given that the client is blocking? Is there any way to wrap a blocking execution within a suspension so even a single threaded should give me concurrency when, for instance, I do a
Thread.sleep
?
e
yes, TestCoroutineScope is also single threaded. no there isn't, they need to be
delay()
or some other suspend fun
a
alright so if they don’t suspend but they rather block, they will take the thread no matter what I do, is that correct?
e
yes, exactly. coroutines can only switch at suspend points which are inserted by the compiler when you call suspend funs
a
gotcha alright thanks for your help! Do you think that
select
can do the trick on the “first n that completes” ?
e
(well, that's a bit of a simplification. if the function you call never suspends, the mere act of calling won't suspend either. so you can't just wrap
Thread.sleep
in a suspend fun)
it can, but I think channels are more straightforward for this
select can work on a variable number of items, but you'll have to loop through it
n
times
versus setting up a single channel and polling the first
n
values
a
if I use channels, is there a way to cancel all other running coroutines? do you have an example on how to do this with channels? The code I came up with seems much more complex than select
e
neither channels nor select will automatically cancel all other coroutines
a
gotcha, is there any snippet you can point me to get an idea?
e
Copy code
val results = supervisorScope {
    val channel = Channel()
    try {
        for (endpoint in endpoints) {
            launch {
                channel.send(client.call(endpoint))
            }
        }
        List(n) {
            channel.receive()
        }
    } finally {
        cancelChildren()
    }
}
untested (I'm on a mobile device) but something like that
a
awesome, will try to play with it
I guess my next question is, if in the
launch
block, the client is blocking, when you do
cancelChildren
they won’t be cancelled for real will they?
e
if you use
runInterruptible {}
they'll at least be sent a thread interrupt which should break out of well-behaved Java blocking methods
a
so I should run the client.call into a
runInterruptible
block then, alright
e
yep. the coroutine doesn't "notice" that it's been cancelled until the next suspension, but at least with runInterruptible that should happen quickly (as long as they blocking code called within doesn't ignore InterruptedException)
it's actually kind of easy to screw up on the Java side if you catch all exceptions and wrap them, for example… but that's a bug whether you're working in coroutines or not
a
yeah I guess that would be a bug
and one more question, on your
cancelChildren()
function, how can I get who’s still running in that example?
e
there's definitely no race-free way to check that
but you might send into the channel which IDs have completed, and the rest are incomplete (or completed after n)
a
and then remove that from the array and cancel all of them.. makes sense
alright! thanks for the hints, will try to play around it! 🙂 much appreciated!
e
👍 hope you like the coroutines way of composing these concurrency primitives
a
oh yeah I love them, it takes a bit of time to get used to it but I guess it’s normal
(and actually if you have any book or reference that would suggest, that’d be great)