haroldadmin
05/12/2020, 7:05 AMclass JobProcessor(val coroutineContext: CoroutineContext): CoroutineScope {
val queue1 = Channel<Job>(Channel.UNLIMITED)
val queue2 = Channel<Job>(Channel.UNLIMITED)
init {
while (isActive) {
selectJob()
}
}
private suspend fun selectJob() {
select {
queue1.onReceive { job -> process(job) }
queue2.onReceive { job -> launch { process(job) } }
}
}
fun sendJobToQueue1() { ... }
fun sendJobToQueue2() { ... }
}
Jobs in Queue 1 are processed immediately, but jobs in Queue 2 are processed in a separate coroutine because I don't want the select
statement to wait for it to complete. Jobs in Queue 1 are more important, so the biased nature of select
works in favour of this design.
In a benchmarking environment, I want to send a lot of jobs to both queues of this processor and then wait until all of them are complete to find how much time it took to process them. I am accomplishing it this way:
while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
selectJob()
}
This waits until both queues have been drained. The problem is that jobs of queue2 are processed in different coroutines, so even if the queue is drained it does not mean that all its jobs have finished processing.
Is there a way for me to wait until jobs of queue2 have finished processing?octylFractal
05/12/2020, 7:06 AMharoldadmin
05/12/2020, 7:17 AMwhile
loop keeps running until the CoroutineScope
is cancelled.octylFractal
05/12/2020, 7:19 AMinit
do that? you don't have it registered as a Job itself, in fact that constructor will just not run at all because the Job is never activeharoldadmin
05/12/2020, 7:32 AMjob.join()
will complete only after all the coroutines in the scope have completed, right?octylFractal
05/12/2020, 7:32 AMjoin()
won't work in this caseJob
will be in-active, if you called join()
on it, according to the docs:
This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in new state.but I don't think there's any way for it to complete. so perhaps in that case you would have to loop over the children of the Job instead? I feel like there's a better solution here but I'm not sure what it is
CompletableJob
/ hold a separate reference to the Job
and first call complete()
on that, then join()
haroldadmin
05/12/2020, 8:11 AMuli
05/12/2020, 8:48 AMcoroutineScope {
while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
selectJob()
}
}
Then make selectJob an extension on CoroutineScope
CoroutineScope
haroldadmin
05/12/2020, 10:06 AMJob
of the processor from the that of the coroutine draining the queues?uli
05/12/2020, 10:20 AMharoldadmin
05/12/2020, 10:47 AMCoroutineScope
, and joining on the job of a scope created through coroutineScope { ... }
.
A disadvantage of using coroutineScope {...}
is that I can not use it in my init block because it requires the enclosing function to be a suspending function.octylFractal
05/12/2020, 10:48 AMinit
block is broken anyways, the constructor will block foreverselectJob()
is suspend as wellharoldadmin
05/12/2020, 10:57 AMuli
05/12/2020, 11:07 AMcoroutineScope {}
async {}
you can call coroutineScope {}
haroldadmin
05/12/2020, 11:18 AMcoroutineScope{ }
block inside async
?
Edit: Thanks for the explanation. I'll try this and ask again if I need more help.uli
05/12/2020, 11:59 AMdrainAsync
suspend
internal suspend fun drain() = coroutineScope {
while (isActive && (!setStateChannel.isEmpty || !getStateChannel.isEmpty)) {
selectJob()
}
}
haroldadmin
05/12/2020, 12:18 PMtailrec
modifier. This is the approach I have decided to go for right now.uli
05/12/2020, 2:23 PMharoldadmin
05/12/2020, 5:50 PMsuspend fun drainAsync() {
do {
coroutineScope {
val scope = this
while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) {
selectJob(scope)
}
}
} while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) // Nested jobs might have filled the queues again
}
suspend fun selectJob(coroutineScope: CoroutineScope) {
select<Unit> {
setStateChannel.onReceive { reducer ->
val newState = stateHolder.state.reducer()
setNewState(newState)
}
getStateChannel.onReceive { action ->
coroutineScope.launch {
action.invoke(stateHolder.state)
}
}
}
}