How do I wait for a coroutine scope to actually fi...
# coroutines
t
How do I wait for a coroutine scope to actually finish after calling
cancel()
. I'm trying to write client-server unit tests and I would like to properly clean up at the end of each case. However, if I do not add a
delay(1000)
after
cancel()
, I get an exception like
java.util.concurrent.RejectedExecutionException: event executor terminated
(1000 in the delay is not really important 100 works just as well).
g
have you tried to do cancelAndJoin() before delay?
because cancel is just a signal, it is not cancelled immediatelly
though, I'm not sure from where RejectedExecutionException is coming
t
CoroutineScope
has only
cancel
as I see. The full stack trace:
Copy code
Exception in thread "DefaultDispatcher-worker-4 @ws-writer#20" kotlinx.coroutines.CompletionHandlerException: Exception in completion handler ChildCompletion@1bda3618[job@5b137bda] for "ws-writer#20":StandaloneCoroutine{Cancelled}@5b137bda
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1502)
	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:325)
	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:242)
	at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:910)
	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:867)
	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:832)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:234)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [CoroutineName(ws-writer), CoroutineId(20), "ws-writer#20":StandaloneCoroutine{Cancelled}@5b137bda, io.ktor.server.netty.EventLoopGroupProxy@bbd9d2]
Caused by: kotlinx.coroutines.CompletionHandlerException: Exception in completion handler ResumeOnCompletion@1f944d25[job@55b2a76e] for JobImpl{Cancelled}@55b2a76e
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1502)
	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:325)
	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:242)
	at kotlinx.coroutines.JobSupport.continueCompleting(JobSupport.kt:939)
	at kotlinx.coroutines.JobSupport.access$continueCompleting(JobSupport.kt:25)
	at kotlinx.coroutines.JobSupport$ChildCompletion.invoke(JobSupport.kt:1159)
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1497)
	... 14 more
Caused by: java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
	at io.ktor.server.netty.NettyDispatcher.dispatch(CIO.kt:69)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:474)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:508)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:497)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:368)
	at kotlinx.coroutines.ResumeOnCompletion.invoke(JobSupport.kt:1391)
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1497)
	... 20 more
g
cancelAndJoin is an extension function of Job
t
Yes, I have a scope with two jobs in it and I would like to cancel the scope and all jobs in it.
g
So you can do:
currentCoroutineContext()[Job]?.cancelAndJoin()
scope with two jobs?
what do you mean? Scope may have only one top level job
t
Copy code
fun start() {
        scope.launch { run() }
        scope.launch { timeout() }
    }
g
So if you want to cancel scope, you can do:
Copy code
scope.coroutineContext[Job]?.cancelAndJoin()
s
Unusual thing to want to do, though. Normally the reason to use a custom scope is if you have a lifecycle that exists outside of an individual suspending function. If you actually have the ability to wait for the scope to terminate,
coroutineScope { … }
is likely to be more suitable.
t
Results in the same error. And thank you for the help.
I do have a lifecycle outside of the function and in general I cannot wait. Or, more precisely, I cannot wait in
start
.
s
Actually I didn't spot that you said you're writing unit tests... maybe you want to look at the coroutines testing library
It has some mechanisms of its own for automatic cleanup
t
Good idea, I'll check it out, thank you.
(Does not solve the actual problem tho, I see actual use cases when I want to use multiple clients from one process and stop them on-demand.)
g
could you show full code? It looks that the issue is that executor is actually closed, which is not really correct way (so you cancelled it before coroutine is able to cancel Job) So I don't see it as an issue caused by cancellation, but by some other factors
z
I would expect that to be something like:
Copy code
scope.launch {
  withTimeout {
    run()
  }
}
Or if
timeout()
is really something special,
Copy code
scope.launch {
  launch { run() }
  launch { timeout() }
}
to preserve the call hierarchy in the job hierarchy
t
The code in question (please not that this whole stuff is in-development and I try not to advertise it as it is not ready yet). The code that starts the coroutines: https://github.com/spxbhuhb/adaptive/blob/ce2de7510109f0ec34a8b0a309659274c8288b04[…]/simplexion/adaptive/ktor/BasicWebSocketServiceCallTransport.kt The unit test that stops the coroutines (delay is the current workaround): https://github.com/spxbhuhb/adaptive/blob/ce2de7510109f0ec34a8b0a309659274c8288b04[…]vmTest/kotlin/hu/simplexion/adaptive/ktor/worker/SessionTest.kt
g
For me it looks as something unrelated to coroutines directly. That scheduler on which coroutines are executed were disposed before scope is actually finished
t
I think that's the default scheduler, I don't do anything fancy when creating the scope. Also, my original question was, how to wait until the scope actually finishes.
Copy code
val scope = CoroutineScope(Dispatchers.Default)
g
cancelAndJoin as I mentioned above, it's the way to wait that scope is cancelled
As I see test runs on io.netty.util.concurrent.SingleThreadEventExecutor which is already shutdown by the time when coroutine tries to complete one of continuations and scope is not cancelled yet
t
How can the netty executor shut down before I call
scope.cancel()
? Also, I don't get why
delay(100)
solves this issue.
g
I'm not completely sure about the whole setup there, it looks pretty sophisticated and I don't have enough insight into it, but I believe you can try to implement your function BasicWebSocketServiceCallTransport.stop() like:
Copy code
fun BasicWebSocketServiceCallTransport.stop() {
   runBlocking {
         val job = requreNotNull(scope.coroutineContext[Job]) { "Incorrect scope: No Job found" }
         job.cancelAndJoin()
   }
}
t
Ok, I'll try that, thank you.
g
> the netty executor shut down before I call
scope.cancel()
Hard to say exactly, need to understand the whole setup there. > why
delay(100)
solves this issue. From your code it looks that it just gives time of job to shutdown and blocks thread on which it runs. So it actually mix of coroutines dispatching and thread blocking, which itself may be tricky (for cases when thread on which coroutines is executing is blocked) In general, try to use coroutines test library is a good idea, but I feel that it way more complex in your case, with global state, which may not be easy to adopt. But if you look into examples of other ktor tests, it actually not what I would expect, I have quite a few of them in a couple of projects and it never needed to have some global dispatching, essentially just use ktor-server-test-host and
testApplication {}
block, which encapsulates test host in a single lambda for each test where I can send and receive requests and if I need some background work, I just use testApplication scope, so coroutine never escapes it But as I said, I'm not exactly sure what you try to achieve here at the first glance
The most suspicious part for me is about multiple runBlocking in different places. runBlocking (or runTest from coroutines test lib) could be fine of course, but I would expect it to be only one on top level of the test itself, and all scopes used to launch the app would be just child scopes of it (so scope of runBlocking used to create server, and all background tasks)
t
Well, there is basically one runBlocking, the one in
stop
is there because of the
delay
workaround. So, there are no nested runBlockings at the end.
g
As I understand transport has own scope under the hood, also it potentially shared between tests
t
I create a new transport for each test. Theoretically, all tests start with a clean state and finish with a clean state. Nothing is shared between them, even the H2 db name is unique.
withProtoWebSocketTransport
creates a new transport and assigns it to
defaultServiceCallTransport
. During normal application startup this typically happens once, but for tests I try to clean up everything.
393 Views