*Hi. We have a Kotlin server application that proc...
# coroutines
s
Hi. We have a Kotlin server application that processes tens of thousands of files every day using coroutines. We download the files from S3, call
runBlocking
and inside it loop through the lines in the file and launch a coroutine for each line as they can be processed independently (we also limit the number of coroutines that are created at once because the files could have millions of lines). The coroutines eventually call Apache async http client to send data to a remote service. The async http client is bridged to coroutines so it is behind a suspend function called from the coroutines. The application works pretty well most of the time, but every now and then - around two, three times a month - they hang and stop working. When we look at the thread dump this is where it is blocked:
Copy code
"main" #1 prio=5 os_prio=0 cpu=35134684.86ms elapsed=4149440.89s tid=0x00007fdb68017800 nid=0x24ef waiting on condition [0x00007fdb7021f000]
  java.lang.Thread.State: TIMED_WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
    - parking to wait for <0x00000000c2875a00> (a kotlinx.coroutines.BlockingCoroutine)
    at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.9.1/LockSupport.java:234)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:88)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at com.i.i.audience.EventHandler.processContent(EventHandler.kt:249)
There are no other blocked threads, everything else is idle so it looks the coroutines already finished inside the
runBlocking
block. I have seen this part of the
runBlocking
code pointed out in many threads and issues but those were mostly Android and the blocking was reproducible. We cannot reproduce this, it just happens in production very rarely. Looking inside the BuilderKt my suspicion is on the following lines:
Copy code
while (true) {
   @Suppress("DEPRECATION")
   if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
1.  val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
   // note: process next even may loose unpark flag, so check if completed before parking
2.  if (isCompleted) break
3.  parkNanos(this, parkNanos)
}
Cleary the problem is that this code waits indefinitely because at 1.
parkNanos = Long.MAX_VALUE
and there’s nothing any more to wake it up. My theory is that the the
afterCompletion
method that calls
unpark
is received between line 2. and 3. So
isCompleted
will be yet false but by the time
parkNanos
starts to wait
unpark
was already called. As I don’t know well the inner workings thought I ask if anyone has any idea. I plan to patch this code to wait only a limited time in parkNanos and wake up periodically to check if the coroutines have completed. In distributed systems it is never a good idea to wait indefinitely anyway, but there could be something we do wrong. Any idea is appreciated, thanks.
s
A couple of questions to help understand the problem: 1. Do you ever specify a dispatcher when calling
runBlocking
or launching coroutines, or do you always rely on its default event loop? 2. Is there any possibility, anywhere in your application, that
runBlocking
is called from inside an existing coroutine or suspending function?
Since you're using an async HTTP client with coroutines, I would venture that the lack of blocked threads doesn't necessarily mean that all the coroutines are finished. I think it's reasonably likely that
runBlocking
is just continuing to wait because one of its child jobs has for some reason not completed.
s
1. We do not specify anything for runBlocking. 2. No other code calls runBlocking.
👍 1
I also thought about that it something to do with the http client threading, e.g. cannot obtain a worker thread and gets blocked but I can't see anything like that in the thread dump.
s
You said that the async HTTP client is bridged to coroutines. What does the bridging code look like?
s
Copy code
private suspend fun HttpAsyncClient.execute(request: HttpUriRequest): HttpResponse {
        return suspendCancellableCoroutine { cont: CancellableContinuation<HttpResponse> ->
            val future = this.execute(request, object : FutureCallback<HttpResponse> {
                override fun completed(result: HttpResponse) {
                    cont.resumeWith(Result.success(result))
                }

                override fun cancelled() {
                    if (cont.isCancelled) return
                    cont.resumeWith(Result.failure(CancellationException("Cancelled")))
                }

                override fun failed(ex: Exception) {
                    cont.resumeWithException(ex)
                }
            })

            cont.invokeOnCancellation { future.cancel(false) }
            Unit
        }
    }
👍 1
s
What about error handling? Do you catch exceptions thrown by the HTTP calls? Sorry for lots of questions 😄 just trying to explore all the possibilities
s
No worries, that's OK. Happy to someone having a look. Let me check the code for error handling.
Yes, all errors are caught with the coroutine. Here's the whole code block
Copy code
runBlocking {
            // limit coroutines, since otherwise we will keep creating coroutines until all
            // the input is consumed (or run out of memory, which is the case with large files).
            // NB: coroutines will also limit 'active' concurrency to the pool size.
            val maxCoroutines = processor.maxCoroutines()
            val maximumCoroutinesSemaphore = Semaphore(maxCoroutines)

            LOGGER.info("Threads: ${properties.ingestion.threads}, Max co-routines: $maxCoroutines")
            val readTime = measureTimeMillis {
                var checkpoint = System.currentTimeMillis()
                for (item in processor.parse(stream.buffered(10_000))) {
                    maximumCoroutinesSemaphore.acquire()

                    if (System.currentTimeMillis() - checkpoint > 60000) {
                        extendVisibility()
                        checkpoint = System.currentTimeMillis()
                    }

                    itemCounter++
                    launch(fixedPoolDispatcher) {
                        try {
                            // LOGGER.info("processing {}", item)
                            val (added, removed) = processor.process(item)
                            userCounter.increment()
                            addedCounter.add(added.toLong())
                            removedCounter.add(removed.toLong())
                        } catch (ex: Exception) {
                            // message already logged by processor
                            errorCounter.increment()
                            operationsErrorCounter.add(item.numberOfOperations.toLong())
                        } finally {
                            maximumCoroutinesSemaphore.release()
                        }
                    }
                }
            }
The
processor.process(item)
is the suspend function that eventually calls the http client. You can also see the semaphore that is used to limit how many coroutines are created at once.
s
👍 I don't see any obvious problem with the error handling. Coroutines can sometimes hang if a cancellation exception is caught and suppressed, but I don't think that would happen here.
What's the
fixedPoolDispatcher
? Any chance it could be failing to dispatch some coroutines?
s
Copy code
val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(defaultProperties.ingestion.threads)
val fixedPoolDispatcher = fixedThreadPool.asCoroutineDispatcher()
What I can think of is that maybe indeed something happens with the http client. E.g. under high load a response is lost so it never calls complete.
s
Maybe you could guard against that by adding a timeout to each call, if you have an upper bound for how long you expect a request to reasonably take.
Based on what you've shared, I don't have any specific suggestions of where the problem could be. But I do have a suggestion for how you might alter the code. My preferred approach for limiting concurrency to some number n in a situation like this is to launch n coroutines and have them each loop over a single work queue (a Channel). By doing that you would reduce the number of moving parts and synchronization points, which should limit the number of places problems can hide.
s
That's a good idea. I will also sniff around the http client. We have a readTimeout set so it should not wait forever, but who knows. Thanks for looking at it.
u
And definitely
ensureActive
in your catch block. As @Sam said, swallowing CancelationExceptions is not good, even if this might not be your issue. https://betterprogramming.pub/the-silent-killer-thats-crashing-your-coroutines-9171d1e8f79b
👍 1
>
Copy code
// message already logged by processor
Can you find out from your logs if any
CancelationExceptions
are being caught?
s
Did a quick search. I couldn't find any.
u
And InterruptedException?
s
I couldn't find any but just realised that the last hang happened so long ago that we don't have those logs any more. But I will look for them if it happens again.
d
There are no other blocked threads, everything else is idle so it looks the coroutines already finished inside the
runBlocking
block.
What about the threads in
fixedPoolDispatcher
? If they are not also blocked due to being parked, then what are they doing?
u
And a small race:
Copy code
launch(fixedPoolDispatcher) {
                        try {
...
                        } finally {
                            maximumCoroutinesSemaphore.release()
                        }
                    }
Here the launched coroutine can be canceled before it even starts. In that case, the try block will not even be entered and therefor the finally block won’t get executed. So maybe
launch(start=ATOMIC)
helps.
s
Oh, nicely spotted!
onCompletion
(instead of
finally
) could be a solution to that as well, I think.
s
Good point. Although if one coroutine skips the finally block it only means that instead of N only N-1 coroutines can be launched at once because one permit will never be released.
But good point on that it can be cancelled before starting. I am rewriting the semaphore part to use channels as @Sam suggested. Hopefully that will be safer.
👍 1
u
And over the weeks this might add up to N-N=0. Which would explain why the visible issue occures so rarely.
☝️ 1
s
But if it was a semaphore issue I think we should see the a thread blocked in waiting for getting a permit. Anyway, hopefully with the channels it's less confusing.
@Dmitry Khalanskiy [JB] The pool threads are waiting for task. It seems to me they are idle. Here's one:
Copy code
"pool-3-thread-1" #19 prio=5 os_prio=0 cpu=80102322.38ms elapsed=7597007.90s tid=0x00007f276c8a7000 nid=0x7c4a waiting on condition  [0x00007f2718d5e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
	- parking to wait for  <0x00000000c0f07960> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.9.1/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.9.1/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.LinkedBlockingQueue.take(java.base@11.0.9.1/LinkedBlockingQueue.java:433)
	at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.9.1/ThreadPoolExecutor.java:1054)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1/ThreadPoolExecutor.java:1114)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@11.0.9.1/Thread.java:834)
All the threads has the same stack
u
Are you sure? This is (hopefully) not a thread semaphore but a coroutine semaphore. As such it would supsend and not block a thread
1
s
Hm. That could be
All good points.
u
where are you importing Semaphore from?
s
java.util.concurrent
u
I guess you should use kotlinx.coroutines.sync.Semaphore
💯 1
s
Good to know that such thing exists, but for now I go with the channel solution. If I am not mistaken those should exit if something is cancelled.
u
I would not mix blocking and suspending. But I also think, it shoud not matter here
1
s
Yes, old habit to go to the well known java.util classes.
s
The use of the blocking Semaphore could certainly account for the problem here. That's a really good spot that I didn't even think to consider. It would gradually block the threads that are responsible for dispatching coroutines, meaning that eventually no more coroutines can be dispatched and the application will hang.
d
If Java's semaphore is used, then yes, if all of its tokens are gone, we would see the main thread hanging on
acquire
. According to the thread dump, this isn't what's happening, so this doesn't seem to be the issue. Also, I don't see a way for this code to realistically get cancelled so that children can't invoke the
finally
block.
s
In this case shouldn't I see the runBlocking thread being parked at that line where it wants a permit? I've seen that in some stack traces.
d
Exactly my point, yes, you should.
s
Ah, sorry haven't read your message carefully, that's indeed what you said.
d
We'll need to look into this. Is it possible for you to share your code with us so that we could try to reproduce the problem?
s
The codebase is relatively big, unfortunately I am unable to share it. However I think the crucial parts are what I've already pasted, the runBlocking part and the bridge for the async HTTP client. There are no other coroutine specific code, just a bunch of interfaces and classes with suspend functions that eventually call the http client. I also tried to write a little simple code that does the same thing but it never hanged.
d
Here's the one possible explanation I can think of right now that doesn't involve a bug in `runBlocking`: very rarely (so rarely that it can't happen more than the semaphore's number of permits during a single
runBlocking
run), the
FutureCallback
doesn't receive any callbacks: neither
completed
, nor
cancelled
, nor
failed
. This matches the symptoms you're describing precisely: all of the threads dedicated to running coroutines would have nothing to do, and
runBlocking
would be parked.
I don't know anything about the HTTP library you're using, so I can't say how likely it is that no callbacks get received.
s
It's the standard apache commons async httpclient. But I had this idea too, and maybe the reason I cannot see anything in the stack trace is because the httpclient is parked in a generic socket reading loop while the future it's supposed to call is not visible.
d
Here's a proposal:
Copy code
val (added, removed) = withTimeoutOrNull(10.minutes) {
    processor.process(item)
} ?: return@launch log("timed out a task")
If
process
does hang indefinitely for some reason, then this will cancel the waiting.
If even after introducing a timeout,
runBlocking
still hangs occasionally, please let us know: https://github.com/Kotlin/kotlinx.coroutines/issues So far, I'd stick to the assumption that the async callback simply never comes.
s
Oh, that timeout is very elegant. This way I don't need to go down the http client route. I will try the channel and this and will see if this comes up again in prod. Thank you for all the good ideas.
117 Views