you can probably handle the selection of which tas...
# coroutines
b
you can probably handle the selection of which task to run using a select clause, which bias the first clause (generate tasks) over the second (combine task)
r
Could you elaborate on that? Maybe I'm mistaken, but wouldn't that allow only one generate task at a time (or let generate and combine run at the same time)?
b
well that would depend on what you do after the select clause is successful. the select will only choose if it's time to run a generate or combine, once you have that decision you have to decide what to do with it
r
I must be missing something fundamental, because it's still not clicking for me. Thanks anyway.
b
well it's hard to get more specific without knowing more about how you want the tasks to operate, for instance, if you want the generators to run concurrently why do they even need a queue?
r
Good point, I should edit my original question to include that. It's because there may occasionally be a large number to run, and they are quite CPU intensive, so I'd like to only run a maximum number of them at a given time (probably # of cores - 1).
b
ah, in that case I would probably create a specific threadpool for that task with a set number of threads to limit it
better than manually dealing with limiting
r
Yes, I imagined it would be something like that. I was thinking I'd need some sort of "worker thread" equivalent that would run generate tasks from a queue, and some sort of lock to switch between it and the combine task, but I'm not sure what the best way to do that is (while also avoiding a busy
while(true)
loop when there are no tasks to run).
(I'm not sure what the correct terminology is, sorry)
b
is the combine task supposed to run on the same threadpool as the generate tasks?
when you create a threadpool context and launch your coroutines using it, it effectively creates it's own queue and runs them from it internally, so you don't have to
r
I don't think it needs to (and it possibly shouldn't since it should be exclusive), I just need to make sure I'm either running either generate tasks or a combine task, not both.
b
after re-reading it, its simpler than i thought, I had thought combine tasks could be launched by themselves, but they always follow a group of generate tasks?
in that case there's no need for select
r
Yes, a simple analogy would be like a video editor. I can work on multiple layers, and changes to a layer are rendered to an intermediate file. The combine task would then take the rendered results and merge them into one video file. Changing any layer would add that layer to the render queue, and when the render queue empties, it fires the combine task. While the combine task is running, I can still edit layers, and they'll still be added to the queue, but they won't render till the combine pass is done.
(I'm not actually working with video, but it's a similar enough to be comparable)
From what I've read in
coroutines-guide.md
, I was thinking an actor would be what I'm looking for, but I'm not sure how to check if there are no more generate tasks in the channel, run the combine task, then go back to waiting for generate tasks, without closing the channel. Does that sound like I'm (roughly) on the right track, or is there a better way that comes to mind? (I'm also not sure if actor would easily allow for running multiple generates concurrently.)
b
yea you don't want an actor if you want them to run concurrently
r
Okay, good to know
b
i mean you could use an actor, and then launch the tasks from within the actor, but you're really only using the actor as a channel in that case
r
Makes sense
b
Copy code
suspend fun run(generateInputs: Channel<Int>) {

    val genContext = newFixedThreadPoolContext(5, "gen pool")

    suspend fun genTask(input: Int) {
        println("launching $input")
        delay(input.toLong(), TimeUnit.SECONDS)
        println("finished $input")
    }

    for (input in generateInputs) {

        val job = launch(genContext) { genTask(input) }

        while (!job.isCompleted && !generateInputs.isClosedForReceive) {
            select<Unit> {
                generateInputs.onReceiveOrNull {
                    if (it == null) {
                        println("chan closed")
                        job.join()
                    } else
                        launch(genContext, parent = job) { genTask(it) }
                }

                job.onJoin { println("joined") }
            }
        }

        println("combine")
//        doCombineTask()
    }
}
thats what i came up with based on your description
and you can probably drop the second condition in the
while
loop,
the
run
function can be turned into an actor and it would be essentially the same
r
Nice, it seems to work pretty well. No I just have to break it apart and understand it 🙂 Thanks a ton!