Yuriy Dynnikov
06/21/2022, 3:36 PMGlobalScope.launch {...}
, stop working. Looks like an exception gets its way to CoroutineScheduler and breaks it. How can I protect myself from this kind of behavior?
Exception stacktrace:
java.util.concurrent.CancellationException: The task was rejected
at kotlinx.coroutines.ExceptionsKt.CancellationException(Exceptions.kt:22)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.cancelJobOnRejection(Executors.kt:169)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:131)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:159)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513)
at kotlinx.coroutines.AwaitAll$AwaitAllNode.invoke(Await.kt:115)
at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1519)
at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:323)
at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:240)
at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:906)
at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:863)
at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:828)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)
at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:39)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@47310ad2[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5f40bc28[Wrapped task = CancellableContinuation(DispatchedContinuation[java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20], Continuation at core.MLock$invoke$2.invokeSuspend(MLock.kt:35)@1a367720]){Completed}@6f01d5c3]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:128)
... 19 more
Joffrey
06/21/2022, 4:01 PMYuriy Dynnikov
06/21/2022, 4:02 PMJoffrey
06/21/2022, 4:03 PMGlobalScope.launch
are affected too, this likely means the thread pool used by Dispatchers.Default
is terminated for some reason. Maybe Dispatchers.shutdown()
is being called for some reason?Yuriy Dynnikov
06/21/2022, 4:13 PMfinishPipeline
. As I'm using newSingleThreadContext
in pipeline and I noticed that they're not cleared, I try to call destroy
in some meaningful way, so I made a garbage collector
private suspend fun finishPipeline(pipeline: MPipeline<*>) {
withContext(context) {
// ...
gcPipelines.add(pipeline)
}
}
// This is launched by CoroutineScope(context).launch { gcLoop() }
suspend fun gcLoop() {
while (true) {
delay(5000)
withContext(context) {
alog.debug("Running Garbage collector, there are ${gcPipelines.size} pipelines to destroy")
try {
for (pipeline in gcPipelines) {
pipeline.destroy()
}
gcPipelines.clear()
} catch (e: Exception) {
alog.error("Exception caught while garbage collecting: ${e}, ${e.stackTraceToString()}")
}
}
}
}
Yuriy Dynnikov
06/21/2022, 4:15 PMcontext.close()
But somehow that cancels coroutine I'm still in. Maybe there is just better way to destroy threads created by newSingleThreadContext
?Joffrey
06/21/2022, 4:20 PMcontext
is your withContext(context)
? If gcLoop()
is called with CoroutineScope(context)
, why do you need to switch again to this context
within the loop? It's either pointless (already in this contetx) or harmful (context-switching at every loop iteration)Joffrey
06/21/2022, 4:21 PMYuriy Dynnikov
06/21/2022, 4:36 PMcontext
is newSingleThreadContext
created in Core
Also noticed that switching while putting together =(
Maybe I'm using contexts wrong way: I have many pipelines (this app is kinda mix of gitlab pipelines and IFTTT) and I need to have some kind of pipeline-level mutices inside them. For that purpose I create single thread context and use only inside pipeline
While writing this I've realized that while all of my events get processed through Channel which acts like an event bus, I have "shortcut" for StepFinishedEvent which avoids going into eventbus. So when I emit StepFinishedEvent which therefore triggers adding it to GC, I'm still in that pipeline's coroutineYuriy Dynnikov
06/21/2022, 4:39 PMYuriy Dynnikov
06/21/2022, 4:47 PMCoroutineScope(context).launch { getEventsLoop() }
CoroutineScope(context).launch { gcLoop() }
I understand launch
as "Fire and forget", but is using the same context a problem in this case?Joffrey
06/21/2022, 4:51 PM