Szabolcs Póta
10/05/2023, 8:27 AMrunBlocking
and inside it loop through the lines in the file and launch a coroutine for each line as they can be processed independently (we also limit the number of coroutines that are created at once because the files could have millions of lines). The coroutines eventually call Apache async http client to send data to a remote service. The async http client is bridged to coroutines so it is behind a suspend function called from the coroutines. The application works pretty well most of the time, but every now and then - around two, three times a month - they hang and stop working. When we look at the thread dump this is where it is blocked:
"main" #1 prio=5 os_prio=0 cpu=35134684.86ms elapsed=4149440.89s tid=0x00007fdb68017800 nid=0x24ef waiting on condition [0x00007fdb7021f000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
- parking to wait for <0x00000000c2875a00> (a kotlinx.coroutines.BlockingCoroutine)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.9.1/LockSupport.java:234)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:88)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at com.i.i.audience.EventHandler.processContent(EventHandler.kt:249)
There are no other blocked threads, everything else is idle so it looks the coroutines already finished inside the runBlocking
block. I have seen this part of the runBlocking
code pointed out in many threads and issues but those were mostly Android and the blocking was reproducible. We cannot reproduce this, it just happens in production very rarely. Looking inside the BuilderKt my suspicion is on the following lines:
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
1. val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
2. if (isCompleted) break
3. parkNanos(this, parkNanos)
}
Cleary the problem is that this code waits indefinitely because at 1. parkNanos = Long.MAX_VALUE
and there’s nothing any more to wake it up. My theory is that the the afterCompletion
method that calls unpark
is received between line 2. and 3. So isCompleted
will be yet false but by the time parkNanos
starts to wait unpark
was already called. As I don’t know well the inner workings thought I ask if anyone has any idea. I plan to patch this code to wait only a limited time in parkNanos and wake up periodically to check if the coroutines have completed. In distributed systems it is never a good idea to wait indefinitely anyway, but there could be something we do wrong. Any idea is appreciated, thanks.Sam
10/05/2023, 8:44 AMrunBlocking
or launching coroutines, or do you always rely on its default event loop?
2. Is there any possibility, anywhere in your application, that runBlocking
is called from inside an existing coroutine or suspending function?Sam
10/05/2023, 8:54 AMrunBlocking
is just continuing to wait because one of its child jobs has for some reason not completed.Szabolcs Póta
10/05/2023, 8:55 AMSzabolcs Póta
10/05/2023, 8:56 AMSam
10/05/2023, 8:58 AMSzabolcs Póta
10/05/2023, 9:00 AMprivate suspend fun HttpAsyncClient.execute(request: HttpUriRequest): HttpResponse {
return suspendCancellableCoroutine { cont: CancellableContinuation<HttpResponse> ->
val future = this.execute(request, object : FutureCallback<HttpResponse> {
override fun completed(result: HttpResponse) {
cont.resumeWith(Result.success(result))
}
override fun cancelled() {
if (cont.isCancelled) return
cont.resumeWith(Result.failure(CancellationException("Cancelled")))
}
override fun failed(ex: Exception) {
cont.resumeWithException(ex)
}
})
cont.invokeOnCancellation { future.cancel(false) }
Unit
}
}
Sam
10/05/2023, 9:17 AMSzabolcs Póta
10/05/2023, 9:17 AMSzabolcs Póta
10/05/2023, 9:25 AMrunBlocking {
// limit coroutines, since otherwise we will keep creating coroutines until all
// the input is consumed (or run out of memory, which is the case with large files).
// NB: coroutines will also limit 'active' concurrency to the pool size.
val maxCoroutines = processor.maxCoroutines()
val maximumCoroutinesSemaphore = Semaphore(maxCoroutines)
LOGGER.info("Threads: ${properties.ingestion.threads}, Max co-routines: $maxCoroutines")
val readTime = measureTimeMillis {
var checkpoint = System.currentTimeMillis()
for (item in processor.parse(stream.buffered(10_000))) {
maximumCoroutinesSemaphore.acquire()
if (System.currentTimeMillis() - checkpoint > 60000) {
extendVisibility()
checkpoint = System.currentTimeMillis()
}
itemCounter++
launch(fixedPoolDispatcher) {
try {
// LOGGER.info("processing {}", item)
val (added, removed) = processor.process(item)
userCounter.increment()
addedCounter.add(added.toLong())
removedCounter.add(removed.toLong())
} catch (ex: Exception) {
// message already logged by processor
errorCounter.increment()
operationsErrorCounter.add(item.numberOfOperations.toLong())
} finally {
maximumCoroutinesSemaphore.release()
}
}
}
}
The processor.process(item)
is the suspend function that eventually calls the http client. You can also see the semaphore that is used to limit how many coroutines are created at once.Sam
10/05/2023, 9:37 AMSam
10/05/2023, 9:37 AMfixedPoolDispatcher
? Any chance it could be failing to dispatch some coroutines?Szabolcs Póta
10/05/2023, 9:38 AMval fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(defaultProperties.ingestion.threads)
val fixedPoolDispatcher = fixedThreadPool.asCoroutineDispatcher()
Szabolcs Póta
10/05/2023, 9:41 AMSam
10/05/2023, 9:47 AMSam
10/05/2023, 9:51 AMSzabolcs Póta
10/05/2023, 9:53 AMuli
10/05/2023, 11:58 AMensureActive
in your catch block. As @Sam said, swallowing CancelationExceptions is not good, even if this might not be your issue.
https://betterprogramming.pub/the-silent-killer-thats-crashing-your-coroutines-9171d1e8f79buli
10/05/2023, 12:00 PM// message already logged by processor
Can you find out from your logs if any CancelationExceptions
are being caught?Szabolcs Póta
10/05/2023, 12:05 PMuli
10/05/2023, 12:06 PMSzabolcs Póta
10/05/2023, 12:09 PMDmitry Khalanskiy [JB]
10/05/2023, 12:16 PMThere are no other blocked threads, everything else is idle so it looks the coroutines already finished inside theWhat about the threads inblock.runBlocking
fixedPoolDispatcher
? If they are not also blocked due to being parked, then what are they doing?uli
10/05/2023, 12:17 PMlaunch(fixedPoolDispatcher) {
try {
...
} finally {
maximumCoroutinesSemaphore.release()
}
}
Here the launched coroutine can be canceled before it even starts. In that case, the try block will not even be entered and therefor the finally block won’t get executed.
So maybe launch(start=ATOMIC)
helps.Sam
10/05/2023, 12:19 PMonCompletion
(instead of finally
) could be a solution to that as well, I think.Szabolcs Póta
10/05/2023, 12:21 PMSzabolcs Póta
10/05/2023, 12:22 PMuli
10/05/2023, 12:22 PMSzabolcs Póta
10/05/2023, 12:23 PMSzabolcs Póta
10/05/2023, 12:25 PM"pool-3-thread-1" #19 prio=5 os_prio=0 cpu=80102322.38ms elapsed=7597007.90s tid=0x00007f276c8a7000 nid=0x7c4a waiting on condition [0x00007f2718d5e000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
- parking to wait for <0x00000000c0f07960> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.9.1/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.9.1/AbstractQueuedSynchronizer.java:2081)
at java.util.concurrent.LinkedBlockingQueue.take(java.base@11.0.9.1/LinkedBlockingQueue.java:433)
at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.9.1/ThreadPoolExecutor.java:1054)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1/ThreadPoolExecutor.java:1114)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@11.0.9.1/Thread.java:834)
All the threads has the same stackuli
10/05/2023, 12:25 PMSzabolcs Póta
10/05/2023, 12:26 PMSzabolcs Póta
10/05/2023, 12:26 PMuli
10/05/2023, 12:26 PMSzabolcs Póta
10/05/2023, 12:27 PMuli
10/05/2023, 12:28 PMSzabolcs Póta
10/05/2023, 12:29 PMuli
10/05/2023, 12:29 PMSzabolcs Póta
10/05/2023, 12:31 PMSam
10/05/2023, 12:33 PMDmitry Khalanskiy [JB]
10/05/2023, 12:36 PMacquire
. According to the thread dump, this isn't what's happening, so this doesn't seem to be the issue. Also, I don't see a way for this code to realistically get cancelled so that children can't invoke the finally
block.Szabolcs Póta
10/05/2023, 12:37 PMDmitry Khalanskiy [JB]
10/05/2023, 12:38 PMSzabolcs Póta
10/05/2023, 12:39 PMDmitry Khalanskiy [JB]
10/05/2023, 12:43 PMSzabolcs Póta
10/05/2023, 12:48 PMDmitry Khalanskiy [JB]
10/05/2023, 12:50 PMrunBlocking
run), the FutureCallback
doesn't receive any callbacks: neither completed
, nor cancelled
, nor failed
. This matches the symptoms you're describing precisely: all of the threads dedicated to running coroutines would have nothing to do, and runBlocking
would be parked.Dmitry Khalanskiy [JB]
10/05/2023, 12:51 PMSzabolcs Póta
10/05/2023, 12:53 PMDmitry Khalanskiy [JB]
10/05/2023, 12:57 PMval (added, removed) = withTimeoutOrNull(10.minutes) {
processor.process(item)
} ?: return@launch log("timed out a task")
If process
does hang indefinitely for some reason, then this will cancel the waiting.Dmitry Khalanskiy [JB]
10/05/2023, 12:59 PMrunBlocking
still hangs occasionally, please let us know: https://github.com/Kotlin/kotlinx.coroutines/issues So far, I'd stick to the assumption that the async callback simply never comes.Szabolcs Póta
10/05/2023, 1:05 PM