Hey everyone! I have a slightly long question to a...
# coroutines
h
Hey everyone! I have a slightly long question to ask: I have a job processor that selects jobs from two queues, and processes them one by one. The design looks something like this:
Copy code
class JobProcessor(val coroutineContext: CoroutineContext): CoroutineScope {
  val queue1 = Channel<Job>(Channel.UNLIMITED)
  val queue2 = Channel<Job>(Channel.UNLIMITED)

  init {
    while (isActive) {
      selectJob()
    }
  }

  private suspend fun selectJob() {
    select {
      queue1.onReceive { job -> process(job) }
      queue2.onReceive { job -> launch { process(job) } }
    }
  }

  fun sendJobToQueue1() { ... }
  fun sendJobToQueue2() { ... }

}
Jobs in Queue 1 are processed immediately, but jobs in Queue 2 are processed in a separate coroutine because I don't want the
select
statement to wait for it to complete. Jobs in Queue 1 are more important, so the biased nature of
select
works in favour of this design. In a benchmarking environment, I want to send a lot of jobs to both queues of this processor and then wait until all of them are complete to find how much time it took to process them. I am accomplishing it this way:
Copy code
while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
  selectJob()
}
This waits until both queues have been drained. The problem is that jobs of queue2 are processed in different coroutines, so even if the queue is drained it does not mean that all its jobs have finished processing. Is there a way for me to wait until jobs of queue2 have finished processing?
o
join on the Job of the CoroutineScope
h
Can you elaborate more? I was under the impression that the job will never complete because the
while
loop keeps running until the
CoroutineScope
is cancelled.
o
why would the block in
init
do that? you don't have it registered as a Job itself, in fact that constructor will just not run at all because the Job is never active
h
I just wanted to confirm if running
job.join()
will complete only after all the coroutines in the scope have completed, right?
o
correct
hmm, actually
join()
won't work in this case
will it? I'm not sure how the job would get completed....
the top-level `JobProcessor`'s
Job
will be in-active, if you called
join()
on it, according to the docs:
This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in new state.
but I don't think there's any way for it to complete. so perhaps in that case you would have to loop over the children of the Job instead? I feel like there's a better solution here but I'm not sure what it is
oh, interesting, `CoroutineScope`'s Job is actually active
I think that means the easiest way is to cast it to
CompletableJob
/ hold a separate reference to the
Job
and first call
complete()
on that, then
join()
h
Running job.join() doesn't seem to complete. Let me investigate more with CompletableJob. Until then if anyone else has any ideas, please let me know.
u
Copy code
coroutineScope {
  while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
    selectJob()
  } 
}
Then make selectJob an extension on
CoroutineScope
and make sure to launch queue2-jobs on the receiver scope
To avoid being ambigious, you probably do not want `JobProcessor`to inherit from
CoroutineScope
Well, suspend and CoroutineScope-extensions also do not play nice
I guess you want a CoroutineScope parameter on selectJob instead of a receiver
h
What problem does this solve, though? Do you want to separate the coroutine
Job
of the processor from the that of the coroutine draining the queues?
u
Structured concurrency. the `coroutineScope{}`block will only termintae when all coroutines are completed
h
I'm not sure if there is a difference between joining on the job of the processor which implements
CoroutineScope
, and joining on the job of a scope created through
coroutineScope { ... }
. A disadvantage of using
coroutineScope {...}
is that I can not use it in my init block because it requires the enclosing function to be a suspending function.
o
your
init
block is broken anyways, the constructor will block forever
it seems to me like your code already can't compile,
selectJob()
is suspend as well
so clearly what we're being given to look at is not the same as what you're working with
h
u
right and in start, inside launch you can call
coroutineScope {}
and in `drainAsync`inside
async {}
you can call
coroutineScope {}
This would express that you want to wait for all children created inside that scope. It is the most explicit solution
h
Okay. I guess this shows I am not very familiar with how the completion of child jobs in a coroutine scope works. Can you explain the reasoning behind using a
coroutineScope{ }
block inside
async
? Edit: Thanks for the explanation. I'll try this and ask again if I need more help.
Thanks for your help!
👍 1
u
from a style perspective, i'd also remove async and simply make
drainAsync
suspend
Copy code
internal suspend fun drain() = coroutineScope {
        while (isActive && (!setStateChannel.isEmpty || !getStateChannel.isEmpty)) {
            selectJob()
        }
    }
h
Makes sense. I have also found that just waiting for all the jobs to complete is not enough, because one job can produce multiple other jobs to be processed, in either of the queues. Therefore the only strategy that works here is draining recursively, or checking at every loop iteration if any child job is still in active state. It works, but there is a slight performance hit in both cases. The recursive call in my case can be optimized by making it tail-recursive, so that I can use the
tailrec
modifier. This is the approach I have decided to go for right now.
u
that's what coroutine scopes are for. just make sure all jobs are launched on the same (or child) scope
just like we made the `launch`in ´jobSelect` to be launched in the receiver scope you should extend that pattern to all coroutine creation
h
Thanks. I think I understand coroutine scopes much better now. I think the final solution has become cleaner as well, except the fact that I have to pass coroutine scope down as a parameter:
Copy code
suspend fun drainAsync() {
  do {
    coroutineScope {
      val scope = this
      while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) {
        selectJob(scope)
      }
    }
  } while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) // Nested jobs might have filled the queues again
}

suspend fun selectJob(coroutineScope: CoroutineScope) {
  select<Unit> {
    setStateChannel.onReceive { reducer ->
      val newState = stateHolder.state.reducer()
      setNewState(newState)
    }
    getStateChannel.onReceive { action ->
      coroutineScope.launch {
        action.invoke(stateHolder.state)
      }
    }
  }
}