https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Martin Devillers

02/28/2019, 2:19 PM
I’m running into some difficulties when trying to process items in parallel with coroutines. My current scenario is that I have a large collection of items, and I need to do a CPU-consuming transformation on each of them (deciphering data, if you’re curious). As I process these items, I want to broadcast the progress, which is the count of the number of items processed. My idea was that after each item is processed, it would send a
Unit
to a channel, which would count the number of items received and update that information. The items are processed by simply mapping the list to
async
calls which process the item then
send
to the progress channel, then doing
awaitAll
on this channel. See the code. The issue I’m running into is that the progress isn’t being broadcast along as items are processed. Instead, all the items are processed, then all the progress updates are sent, which isn’t very useful. As I understand it, this is because the dispatcher’s queue is “fair”, therefore it handles all the
async
calls before handling the
receive
calls of the actor. If you have ideas on a better way to do this, I would really appreciate it. One solution that I see is to use the “fan-out” technique described in the coroutine guide, but this seems weird to me since I have a list, not a channel, and also because I lose the benefit of leaving the responsibility of determining the parallelism of processing to the coroutine dispatcher. Thanks 🙂
Additional information on the example: this actually works correctly when I run it locally on my computer, but not in my real scenario (an Android device). However, if I change
Dispatchers.Default
to
newFixedThreadPoolContext
in the example (with any number of threads) then I’m able to reproduce.
What you’ll see in the example is that the “Processed items …” all get printed, then all the “Update progress …” get printed
d

Dico

02/28/2019, 2:24 PM
It also depends on the type of channel which you are using
I thik it comes down to that and the fact that your receiver coroutine which is updating the progress on your display is also running in the same context
Unless it's in the UI context
I would recommend increasing the capacity of your channel
Another thing you can consider, although it wouldn't necessarily tackle the problem, is to run your 100 coroutines in a
coroutineScope {
block which will automatically await their completion
Because it does not return until all children are complete
m

Martin Devillers

02/28/2019, 2:29 PM
Changing the capacity won’t help, it’ll allow the “worker” coroutines to continue without waiting on the progress channel to process their update, but it won’t make the progress channel process them faster
Using a
couroutineScope
for the worker coroutines won’t help either, it doesn’t change anything with the dispatcher’s “fair” queuing mechanism, which is the fundamental issue preventing the progress channel from executing until all the workers are done
d

Dico

02/28/2019, 2:31 PM
So your android phone seems to have one core basically?
Yeah, I suggested it because it is cleaner
m

Martin Devillers

02/28/2019, 2:32 PM
I’m using it in the actual code 🙂 but it’s not really useful for the test
d

Dico

02/28/2019, 2:33 PM
This might be a funny and weird solution -
Copy code
List(100) { index -> launch {
    repeat(index) { yield() }
}}
m

Martin Devillers

02/28/2019, 2:34 PM
Yeah, I’d rather not go there 😆
I don’t think the number of cores is the factor, I think it’s just the dispatcher queue
d

Dico

02/28/2019, 2:36 PM
I cant think of any dispatcher that is unfair by definition
Unconfined used to be
Maybe you can do it by creating the coroutines lazily
But then you kinda force it to one thread
Actually, that could be a decent option
m

Martin Devillers

02/28/2019, 2:38 PM
Yes I don’t think that would be solution. I’m actually not sure that using this list of
async
is the right pattern (in practice, it’s a
map { async { /*...*/ } }
, I think that’s probably where there could be a more sensible change.
The whole point is to have it in parallel
And
awaitAll
is designed to start all lazy coroutines in the collection, so lazy won’t work until it’s single-threaded like you said, which entirely defeats the prurpose
d

Dico

02/28/2019, 2:40 PM
Try using
delay
instead of
sleep
.
Actually, that wouldn't work because you're simulating workload
m

Martin Devillers

02/28/2019, 2:41 PM
Yes 🙂
d

Dico

02/28/2019, 2:43 PM
Obvious solution is to have your actor coroutine in another dispatcher
m

Martin Devillers

02/28/2019, 2:44 PM
True
The other solution that I considered, like i said in the original message, is to manually handle the parallelism by doing a “fan-out” on several worker channels
Copy code
@Test
fun workerChannels() = runBlocking<Unit> {
    withContext(newSingleThreadContext("")) {
        val progressActor = actor<Unit> {
            consumeEachIndexed { (index, _) ->
                println("Update progress $index")
            }
        }
        val itemsChannel = produce {
            repeat(100) {
                send(it)
            }
            close()
        }

        List(2) {
            produce {
                for (item in itemsChannel) {
                    Thread.sleep(200)
                    println("Processed item $item")
                    progressActor.send(Unit)
                    send(Unit)
                }
            }.toList()
        }.flatten()

        progressActor.close()
    }
}
This actually works pretty well also. Your solution of having a dedicated dispatcher for the progress actor has the merit of being simple, but i see a potential downside in the fact that I would still be essentially saturation the default dispatcher with a bunch of pending tasks.
d

Dico

02/28/2019, 2:52 PM
Your solution is better
You dont need
close()
at the end of produce, though, and the final 2 coroutines doing the work can just be a
launch
m

Martin Devillers

02/28/2019, 2:54 PM
Yes the
close
is from the original code where this is done within a
coroutineScope {}
call, therefore the actor needs to finish in order for that call to return, and I can’t use
launch
because I actually want to get the result of the work in the real code
d

Dico

02/28/2019, 2:54 PM
Fair enough
d

dekans

02/28/2019, 4:46 PM
oops, your solution seems already close to it
😉 1
2 Views