Hi everyone. Please point towards the truth. For s...
# coroutines
y
Hi everyone. Please point towards the truth. For some reason (I'll fix that later) an exception is thrown inside a coroutine. What's interesting for me is that after this exception all my coroutines, including the ones launched with
GlobalScope.launch {...}
, stop working. Looks like an exception gets its way to CoroutineScheduler and breaks it. How can I protect myself from this kind of behavior? Exception stacktrace:
Copy code
java.util.concurrent.CancellationException: The task was rejected
	at kotlinx.coroutines.ExceptionsKt.CancellationException(Exceptions.kt:22)
	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.cancelJobOnRejection(Executors.kt:169)
	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:131)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:159)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
	at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513)
	at kotlinx.coroutines.AwaitAll$AwaitAllNode.invoke(Await.kt:115)
	at kotlinx.coroutines.JobSupport.notifyCompletion(JobSupport.kt:1519)
	at kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:323)
	at kotlinx.coroutines.JobSupport.finalizeFinishingState(JobSupport.kt:240)
	at kotlinx.coroutines.JobSupport.tryMakeCompletingSlowPath(JobSupport.kt:906)
	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:863)
	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:828)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)
	at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:39)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@47310ad2[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5f40bc28[Wrapped task = CancellableContinuation(DispatchedContinuation[java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20], Continuation at core.MLock$invoke$2.invokeSuspend(MLock.kt:35)@1a367720]){Completed}@6f01d5c3]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6cea1b4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
	at kotlinx.coroutines.ExecutorCoroutineDispatcherImpl.dispatch(Executors.kt:128)
	... 19 more
j
This is not just some random exception thrown from a coroutine, it's an exception that is thrown because the underlying executor service used here is actually terminated. Are you creating a coroutine scope from a java executor and terminating this executor?
y
not exactly. I'll try to illustrate what I have, 1min
👌 1
j
If coroutines started with
GlobalScope.launch
are affected too, this likely means the thread pool used by
Dispatchers.Default
is terminated for some reason. Maybe
Dispatchers.shutdown()
is being called for some reason?
y
Tried to put together some pieces. I have pipelines that process some events and can emit StepFinishedEvent, which gets us to
finishPipeline
. As I'm using
newSingleThreadContext
in pipeline and I noticed that they're not cleared, I try to call
destroy
in some meaningful way, so I made a garbage collector
Copy code
private suspend fun finishPipeline(pipeline: MPipeline<*>) {
    withContext(context) {
        // ...
        gcPipelines.add(pipeline)
    }
}


// This is launched by CoroutineScope(context).launch { gcLoop() }
suspend fun gcLoop() {
    while (true) {
        delay(5000)
        withContext(context) {
            alog.debug("Running Garbage collector, there are ${gcPipelines.size} pipelines to destroy")
            try {
                for (pipeline in gcPipelines) {
                    pipeline.destroy()
                }
                gcPipelines.clear()
            } catch (e: Exception) {
                alog.error("Exception caught while garbage collecting: ${e}, ${e.stackTraceToString()}")
            }
        }
    }
}
destroy is in fact just
Copy code
context.close()
But somehow that cancels coroutine I'm still in. Maybe there is just better way to destroy threads created by
newSingleThreadContext
?
j
What is
context
is your
withContext(context)
? If
gcLoop()
is called with
CoroutineScope(context)
, why do you need to switch again to this
context
within the loop? It's either pointless (already in this contetx) or harmful (context-switching at every loop iteration)
Also, what you're trying to achieve at a higher level doesn't look right. Why do you spawn many single-thread contexts in the first place? The point of coroutines is to decouple tasks from threads, and structured concurrency allows to manage them in bulk
y
context
is
newSingleThreadContext
created in Core Also noticed that switching while putting together =( Maybe I'm using contexts wrong way: I have many pipelines (this app is kinda mix of gitlab pipelines and IFTTT) and I need to have some kind of pipeline-level mutices inside them. For that purpose I create single thread context and use only inside pipeline While writing this I've realized that while all of my events get processed through Channel which acts like an event bus, I have "shortcut" for StepFinishedEvent which avoids going into eventbus. So when I emit StepFinishedEvent which therefore triggers adding it to GC, I'm still in that pipeline's coroutine
👍 1
Is my case of using singleThreadContexts a bit right? 😃 I have deep C++ background, so using higher-level concurrency mechanisms still hurts me )
Also one quick question: Is that reasonable?
Copy code
CoroutineScope(context).launch { getEventsLoop() }
        CoroutineScope(context).launch { gcLoop() }
I understand
launch
as "Fire and forget", but is using the same context a problem in this case?
j
It's ok to have the same context, what might be problematic is to not store those scopes in variables so they can be cancelled properly