Hi, I have a `taskChannel`, a `resultChannel`, a ...
# coroutines
e
Hi, I have a
taskChannel
, a
resultChannel
, a producer and multiple consumers. The producer is lightning fast and the consumers need a bit of time. Once the producer is done, it closes the taskChannel.
launch {
tasks.forEach {
taskChannel.send(it)
}
taskChannel.close()
}
Then this is the consumer side of things.
launch {
repeat (10) {
launch {
for (task in taskChannel) {
val result = process(task)
resultChannel.send(result)
}
}
}
}
This resultChannel undergoes one final step, which is logging the results:
for (result in resultChannel) {
log(result)
}
resultChannel.close()
My question is really just if I’ve closed the channels at the correct times etc. Also, is this a correct usage of the channels, or have I created some antipatterns or some such here? Any replies appreciated. It’s my first time looking into channels, and I like the way they let me start all three processes (production, consumption and logging) at the same time
d
Have you tried running this yet? The
resultChannel
for loop looks suspicious. I reckon it will deadlock there. The channel as to be closed for the for loop to finish.
e
It seems to get through all contents of the channel, but it doesn’t “finish” as you said. Once all content has been consumed it still seems to expect more. Should I move the closing of this channel further up, to the consumers? If I do, how exactly? There are multiple consumers there, should I move it into the inner
launch
?
d
You can wrap the
repeat
in a
coroutineScope
and close the channel right after.
e
So the repeat holds a launch call, and outside of the repeat block i close the channel?
d
Yes, like so.
Copy code
launch {
  coroutineScope {
    repeat (10) {
        launch {
            for (task in taskChannel) {
                val result = process(task)
                resultChannel.send(result)
            }
        }
    }
  }
  resultChannel.close()
}
e
Ah, nice. I was not aware you could scope work like that and be sure any calls after would only be invoked after all children had completed their work. Thanks!
This worked perfectly, thank you. Successfully completed this time instead of waiting in limbo.
If I need to be able to instantly pause / stop every worker, what’s the best way to achieve that? Explore two options here, instantly and after current Job
d
What is a worker?
e
In the last snippet posted in this thread, there’s a
repeat() { launch { // this is a worker } }
section there. Anything inside that launch I concider a worker
That launch has a for-in loop that needs to be able to stop and resume work on command, not sure how I’d go about that the best way
d
Do you want to stop them individually or as a group?
e
Whichever is the better approach. I don’t have a preference, the important bit is that on command, everything stops (suspends?) picking new tasks from the channel until a new command is given to resume picking new tasks.
d
Tricky, I'd argue you should do this via the
taskChannel
, don't send any tasks through, when you wish to pause.
e
Aha okay, So I could set bufferSize to 1, and when I send this command somehow, it prevents the taskChannel from receiving new tasks in a way.. Got it, thanks. Will try it