https://kotlinlang.org logo
#coroutines
Title
# coroutines
e

Esa

01/27/2020, 8:32 AM
Hi, I have a
taskChannel
, a
resultChannel
, a producer and multiple consumers. The producer is lightning fast and the consumers need a bit of time. Once the producer is done, it closes the taskChannel.
launch {
tasks.forEach {
taskChannel.send(it)
}
taskChannel.close()
}
Then this is the consumer side of things.
launch {
repeat (10) {
launch {
for (task in taskChannel) {
val result = process(task)
resultChannel.send(result)
}
}
}
}
This resultChannel undergoes one final step, which is logging the results:
for (result in resultChannel) {
log(result)
}
resultChannel.close()
My question is really just if I’ve closed the channels at the correct times etc. Also, is this a correct usage of the channels, or have I created some antipatterns or some such here? Any replies appreciated. It’s my first time looking into channels, and I like the way they let me start all three processes (production, consumption and logging) at the same time
d

Dominaezzz

01/27/2020, 8:40 AM
Have you tried running this yet? The
resultChannel
for loop looks suspicious. I reckon it will deadlock there. The channel as to be closed for the for loop to finish.
e

Esa

01/27/2020, 8:42 AM
It seems to get through all contents of the channel, but it doesn’t “finish” as you said. Once all content has been consumed it still seems to expect more. Should I move the closing of this channel further up, to the consumers? If I do, how exactly? There are multiple consumers there, should I move it into the inner
launch
?
d

Dominaezzz

01/27/2020, 8:44 AM
You can wrap the
repeat
in a
coroutineScope
and close the channel right after.
e

Esa

01/27/2020, 8:50 AM
So the repeat holds a launch call, and outside of the repeat block i close the channel?
d

Dominaezzz

01/27/2020, 8:51 AM
Yes, like so.
Copy code
launch {
  coroutineScope {
    repeat (10) {
        launch {
            for (task in taskChannel) {
                val result = process(task)
                resultChannel.send(result)
            }
        }
    }
  }
  resultChannel.close()
}
e

Esa

01/27/2020, 8:57 AM
Ah, nice. I was not aware you could scope work like that and be sure any calls after would only be invoked after all children had completed their work. Thanks!
This worked perfectly, thank you. Successfully completed this time instead of waiting in limbo.
If I need to be able to instantly pause / stop every worker, what’s the best way to achieve that? Explore two options here, instantly and after current Job
d

Dominaezzz

01/27/2020, 12:08 PM
What is a worker?
e

Esa

01/27/2020, 12:15 PM
In the last snippet posted in this thread, there’s a
repeat() { launch { // this is a worker } }
section there. Anything inside that launch I concider a worker
That launch has a for-in loop that needs to be able to stop and resume work on command, not sure how I’d go about that the best way
d

Dominaezzz

01/27/2020, 12:19 PM
Do you want to stop them individually or as a group?
e

Esa

01/27/2020, 12:21 PM
Whichever is the better approach. I don’t have a preference, the important bit is that on command, everything stops (suspends?) picking new tasks from the channel until a new command is given to resume picking new tasks.
d

Dominaezzz

01/27/2020, 12:30 PM
Tricky, I'd argue you should do this via the
taskChannel
, don't send any tasks through, when you wish to pause.
e

Esa

01/27/2020, 12:37 PM
Aha okay, So I could set bufferSize to 1, and when I send this command somehow, it prevents the taskChannel from receiving new tasks in a way.. Got it, thanks. Will try it
3 Views