Martin Devillers
02/28/2019, 2:19 PMUnit
to a channel, which would count the number of items received and update that information. The items are processed by simply mapping the list to async
calls which process the item then send
to the progress channel, then doing awaitAll
on this channel. See the code.
The issue I’m running into is that the progress isn’t being broadcast along as items are processed. Instead, all the items are processed, then all the progress updates are sent, which isn’t very useful. As I understand it, this is because the dispatcher’s queue is “fair”, therefore it handles all the async
calls before handling the receive
calls of the actor.
If you have ideas on a better way to do this, I would really appreciate it. One solution that I see is to use the “fan-out” technique described in the coroutine guide, but this seems weird to me since I have a list, not a channel, and also because I lose the benefit of leaving the responsibility of determining the parallelism of processing to the coroutine dispatcher.
Thanks 🙂Dispatchers.Default
to newFixedThreadPoolContext
in the example (with any number of threads) then I’m able to reproduce.Dico
02/28/2019, 2:24 PMcoroutineScope {
block which will automatically await their completionMartin Devillers
02/28/2019, 2:29 PMcouroutineScope
for the worker coroutines won’t help either, it doesn’t change anything with the dispatcher’s “fair” queuing mechanism, which is the fundamental issue preventing the progress channel from executing until all the workers are doneDico
02/28/2019, 2:31 PMMartin Devillers
02/28/2019, 2:32 PMDico
02/28/2019, 2:33 PMList(100) { index -> launch {
repeat(index) { yield() }
}}
Martin Devillers
02/28/2019, 2:34 PMDico
02/28/2019, 2:36 PMMartin Devillers
02/28/2019, 2:38 PMasync
is the right pattern (in practice, it’s a map { async { /*...*/ } }
, I think that’s probably where there could be a more sensible change.awaitAll
is designed to start all lazy coroutines in the collection, so lazy won’t work until it’s single-threaded like you said, which entirely defeats the prurposeDico
02/28/2019, 2:40 PMdelay
instead of sleep
.Martin Devillers
02/28/2019, 2:41 PMDico
02/28/2019, 2:43 PMMartin Devillers
02/28/2019, 2:44 PM@Test
fun workerChannels() = runBlocking<Unit> {
withContext(newSingleThreadContext("")) {
val progressActor = actor<Unit> {
consumeEachIndexed { (index, _) ->
println("Update progress $index")
}
}
val itemsChannel = produce {
repeat(100) {
send(it)
}
close()
}
List(2) {
produce {
for (item in itemsChannel) {
Thread.sleep(200)
println("Processed item $item")
progressActor.send(Unit)
send(Unit)
}
}.toList()
}.flatten()
progressActor.close()
}
}
This actually works pretty well also. Your solution of having a dedicated dispatcher for the progress actor has the merit of being simple, but i see a potential downside in the fact that I would still be essentially saturation the default dispatcher with a bunch of pending tasks.Dico
02/28/2019, 2:52 PMclose()
at the end of produce, though, and the final 2 coroutines doing the work can just be a launch
Martin Devillers
02/28/2019, 2:54 PMclose
is from the original code where this is done within a coroutineScope {}
call, therefore the actor needs to finish in order for that call to return, and I can’t use launch
because I actually want to get the result of the work in the real codeDico
02/28/2019, 2:54 PMdekans
02/28/2019, 4:46 PM