haroldadmin
05/12/2020, 7:05 AMclass JobProcessor(val coroutineContext: CoroutineContext): CoroutineScope {
val queue1 = Channel<Job>(Channel.UNLIMITED)
val queue2 = Channel<Job>(Channel.UNLIMITED)
init {
while (isActive) {
selectJob()
}
}
private suspend fun selectJob() {
select {
queue1.onReceive { job -> process(job) }
queue2.onReceive { job -> launch { process(job) } }
}
}
fun sendJobToQueue1() { ... }
fun sendJobToQueue2() { ... }
}
Jobs in Queue 1 are processed immediately, but jobs in Queue 2 are processed in a separate coroutine because I don't want the select
statement to wait for it to complete. Jobs in Queue 1 are more important, so the biased nature of select
works in favour of this design.
In a benchmarking environment, I want to send a lot of jobs to both queues of this processor and then wait until all of them are complete to find how much time it took to process them. I am accomplishing it this way:
while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
selectJob()
}
This waits until both queues have been drained. The problem is that jobs of queue2 are processed in different coroutines, so even if the queue is drained it does not mean that all its jobs have finished processing.
Is there a way for me to wait until jobs of queue2 have finished processing?octylFractal
05/12/2020, 7:06 AMharoldadmin
05/12/2020, 7:17 AMwhile
loop keeps running until the CoroutineScope
is cancelled.octylFractal
05/12/2020, 7:19 AMinit
do that? you don't have it registered as a Job itself, in fact that constructor will just not run at all because the Job is never activeharoldadmin
05/12/2020, 7:32 AMjob.join()
will complete only after all the coroutines in the scope have completed, right?octylFractal
05/12/2020, 7:32 AMoctylFractal
05/12/2020, 7:33 AMjoin()
won't work in this caseoctylFractal
05/12/2020, 7:34 AMoctylFractal
05/12/2020, 7:35 AMJob
will be in-active, if you called join()
on it, according to the docs:
This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in new state.but I don't think there's any way for it to complete. so perhaps in that case you would have to loop over the children of the Job instead? I feel like there's a better solution here but I'm not sure what it is
octylFractal
05/12/2020, 7:38 AMoctylFractal
05/12/2020, 7:38 AMCompletableJob
/ hold a separate reference to the Job
and first call complete()
on that, then join()
haroldadmin
05/12/2020, 8:11 AMuli
05/12/2020, 8:48 AMcoroutineScope {
while (isActive && (!queue1.isEmpty || !queue2.isEmpty)) {
selectJob()
}
}
Then make selectJob an extension on CoroutineScope
uli
05/12/2020, 9:23 AMCoroutineScope
uli
05/12/2020, 9:28 AMuli
05/12/2020, 9:28 AMharoldadmin
05/12/2020, 10:06 AMJob
of the processor from the that of the coroutine draining the queues?uli
05/12/2020, 10:20 AMharoldadmin
05/12/2020, 10:47 AMCoroutineScope
, and joining on the job of a scope created through coroutineScope { ... }
.
A disadvantage of using coroutineScope {...}
is that I can not use it in my init block because it requires the enclosing function to be a suspending function.octylFractal
05/12/2020, 10:48 AMinit
block is broken anyways, the constructor will block foreveroctylFractal
05/12/2020, 10:49 AMselectJob()
is suspend as welloctylFractal
05/12/2020, 10:50 AMharoldadmin
05/12/2020, 10:57 AMuli
05/12/2020, 11:07 AMcoroutineScope {}
uli
05/12/2020, 11:10 AMasync {}
you can call coroutineScope {}
uli
05/12/2020, 11:11 AMharoldadmin
05/12/2020, 11:18 AMcoroutineScope{ }
block inside async
?
Edit: Thanks for the explanation. I'll try this and ask again if I need more help.haroldadmin
05/12/2020, 11:19 AMuli
05/12/2020, 11:59 AMdrainAsync
suspend
uli
05/12/2020, 12:00 PMinternal suspend fun drain() = coroutineScope {
while (isActive && (!setStateChannel.isEmpty || !getStateChannel.isEmpty)) {
selectJob()
}
}
haroldadmin
05/12/2020, 12:18 PMtailrec
modifier. This is the approach I have decided to go for right now.uli
05/12/2020, 2:23 PMuli
05/12/2020, 2:25 PMharoldadmin
05/12/2020, 5:50 PMsuspend fun drainAsync() {
do {
coroutineScope {
val scope = this
while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) {
selectJob(scope)
}
}
} while (!setStateChannel.isEmpty || !getStateChannel.isEmpty) // Nested jobs might have filled the queues again
}
suspend fun selectJob(coroutineScope: CoroutineScope) {
select<Unit> {
setStateChannel.onReceive { reducer ->
val newState = stateHolder.state.reducer()
setNewState(newState)
}
getStateChannel.onReceive { action ->
coroutineScope.launch {
action.invoke(stateHolder.state)
}
}
}
}