https://kotlinlang.org logo
Title
g

gotoOla

02/04/2020, 2:57 PM
If I have a class that implements CoroutineContext, how would I go about to perform a graceful shutdown? A graceful shutdown in this case meaning wait for any ongoing child to finish (so joinAll() on any child-coroutine spawned) It seems CouroutineConext.cancel() is quite aggressive and does cancel instead of join (which the name kinda indicates as well :P)
z

zak.taccardi

02/04/2020, 2:58 PM
I’ve never extended
CoroutineContext
- why are you doing it?
d

diesieben07

02/04/2020, 3:00 PM
I think you may be confusing
CoroutineScope
with
CoroutineContext
? A
CoroutineScope
has "children" and has "wait for children" built-in.
CoroutineContext
can contain arbitrary elements, such as
Job
.
In any case, it sounds like you want a
Job
or maybe a
SupervisorJob
for your "container" which you can then wait for
a

Adam Powell

02/04/2020, 3:04 PM
or structure the code a bit differently as a
coroutineScope {
  // launch/do work here
}
// all children joined by here
g

gotoOla

02/04/2020, 4:52 PM
🤦‍♂️ you are correct in that I meant CoroutineScope...
d

diesieben07

02/04/2020, 4:57 PM
As for
CoroutineScope
it (usually) has an associated Job in it's associated
CoroutineContext
(see docs on
CoroutineScope.coroutineContext
). So:
checkNotNull(myScope.coroutineContext[Job]) { "CoroutineScope is missing Job" }.join()
g

gotoOla

02/04/2020, 4:58 PM
so if you use the coroutineScope { } everything that is launched in that block is finished to assure that you don't leak any coroutines. I kinda want that behavior on my class that implements CouroutineScope. In my class I have a SendChannel setup where one channel is fetching messages from a queue (so when I receive messages from the queue I perform a channel.send()). The SendChannel is producing while(isRunning) { ... } . To this I connect X number of "consumers" (ReceiveChannels) that processes whatever the fetching one produces (so I have X ongoing coroutines) I was thinking that I would like to expose a stop function in this class, e.g.
fun stop() {
    isRunning = false
    /* perform the "stop action on coroutinescope
       so that the consumers that are active finishes /*
}
ah ok @diesieben07 so that job would be my "supervisor-job" and if I join on that one all children should be joined as well?
d

diesieben07

02/04/2020, 4:59 PM
Yes, exactly.
If you implement
CoroutineScope
manually you might have to create your Job manually as well, depending on your usage.
Note that
SupervisorJob
is actually a factory function which produces a special kind of parent-job (see the docs for details).
g

gotoOla

02/04/2020, 5:03 PM
today all I've done is actually
class MyClass: CoroutineScope by CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
if I have to do it manually I don't really see the benefit or the "scope" of it here. What I do today is
processors.addAll(
    (1..maxNumberOfMessagesInParallel).map {
        <http://logger.info|logger.info>("Starting processing channel.")
        launchMessageProcessor()
    }
)
with processor as a class field and then in my stop function I do
fun stop() {
    isRunning = false
    processors.joinAll()
}
d

diesieben07

02/04/2020, 5:04 PM
That's fine, the
CoroutineScope
factory function automatically introduces a
Job
into the
CoroutineContext
if one is not already present
g

gotoOla

02/04/2020, 5:06 PM
hm if I debug and check what I have in the
this.coroutineContext
it doesn't seem to hold a reference to any job
d

diesieben07

02/04/2020, 5:06 PM
Well, this is what that function does:
ContextScope(if (context[Job] != null) context else context + Job())
So it should hold a
Job
.
g

gotoOla

02/04/2020, 5:08 PM
ah right, looking into the source now as well. I think I used the wrong import. I saw that there was an object Job with a companion object Key
d

dekans

02/04/2020, 5:13 PM
Maybe something like that:
coroutineContext[Job]?.children?.forEach { it.cancelAndJoin() }
Right before cancelling the scope
with some parallelization to not lose time