https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

Szabolcs Póta

10/05/2023, 8:27 AM
Hi. We have a Kotlin server application that processes tens of thousands of files every day using coroutines. We download the files from S3, call
runBlocking
and inside it loop through the lines in the file and launch a coroutine for each line as they can be processed independently (we also limit the number of coroutines that are created at once because the files could have millions of lines). The coroutines eventually call Apache async http client to send data to a remote service. The async http client is bridged to coroutines so it is behind a suspend function called from the coroutines. The application works pretty well most of the time, but every now and then - around two, three times a month - they hang and stop working. When we look at the thread dump this is where it is blocked:
Copy code
"main" #1 prio=5 os_prio=0 cpu=35134684.86ms elapsed=4149440.89s tid=0x00007fdb68017800 nid=0x24ef waiting on condition [0x00007fdb7021f000]
  java.lang.Thread.State: TIMED_WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
    - parking to wait for <0x00000000c2875a00> (a kotlinx.coroutines.BlockingCoroutine)
    at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.9.1/LockSupport.java:234)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:88)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at com.i.i.audience.EventHandler.processContent(EventHandler.kt:249)
There are no other blocked threads, everything else is idle so it looks the coroutines already finished inside the
runBlocking
block. I have seen this part of the
runBlocking
code pointed out in many threads and issues but those were mostly Android and the blocking was reproducible. We cannot reproduce this, it just happens in production very rarely. Looking inside the BuilderKt my suspicion is on the following lines:
Copy code
while (true) {
   @Suppress("DEPRECATION")
   if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
1.  val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
   // note: process next even may loose unpark flag, so check if completed before parking
2.  if (isCompleted) break
3.  parkNanos(this, parkNanos)
}
Cleary the problem is that this code waits indefinitely because at 1.
parkNanos = Long.MAX_VALUE
and there’s nothing any more to wake it up. My theory is that the the
afterCompletion
method that calls
unpark
is received between line 2. and 3. So
isCompleted
will be yet false but by the time
parkNanos
starts to wait
unpark
was already called. As I don’t know well the inner workings thought I ask if anyone has any idea. I plan to patch this code to wait only a limited time in parkNanos and wake up periodically to check if the coroutines have completed. In distributed systems it is never a good idea to wait indefinitely anyway, but there could be something we do wrong. Any idea is appreciated, thanks.
s

Sam

10/05/2023, 8:44 AM
A couple of questions to help understand the problem: 1. Do you ever specify a dispatcher when calling
runBlocking
or launching coroutines, or do you always rely on its default event loop? 2. Is there any possibility, anywhere in your application, that
runBlocking
is called from inside an existing coroutine or suspending function?
Since you're using an async HTTP client with coroutines, I would venture that the lack of blocked threads doesn't necessarily mean that all the coroutines are finished. I think it's reasonably likely that
runBlocking
is just continuing to wait because one of its child jobs has for some reason not completed.
s

Szabolcs Póta

10/05/2023, 8:55 AM
1. We do not specify anything for runBlocking. 2. No other code calls runBlocking.
👍 1
I also thought about that it something to do with the http client threading, e.g. cannot obtain a worker thread and gets blocked but I can't see anything like that in the thread dump.
s

Sam

10/05/2023, 8:58 AM
You said that the async HTTP client is bridged to coroutines. What does the bridging code look like?
s

Szabolcs Póta

10/05/2023, 9:00 AM
Copy code
private suspend fun HttpAsyncClient.execute(request: HttpUriRequest): HttpResponse {
        return suspendCancellableCoroutine { cont: CancellableContinuation<HttpResponse> ->
            val future = this.execute(request, object : FutureCallback<HttpResponse> {
                override fun completed(result: HttpResponse) {
                    cont.resumeWith(Result.success(result))
                }

                override fun cancelled() {
                    if (cont.isCancelled) return
                    cont.resumeWith(Result.failure(CancellationException("Cancelled")))
                }

                override fun failed(ex: Exception) {
                    cont.resumeWithException(ex)
                }
            })

            cont.invokeOnCancellation { future.cancel(false) }
            Unit
        }
    }
👍 1
s

Sam

10/05/2023, 9:17 AM
What about error handling? Do you catch exceptions thrown by the HTTP calls? Sorry for lots of questions 😄 just trying to explore all the possibilities
s

Szabolcs Póta

10/05/2023, 9:17 AM
No worries, that's OK. Happy to someone having a look. Let me check the code for error handling.
Yes, all errors are caught with the coroutine. Here's the whole code block
Copy code
runBlocking {
            // limit coroutines, since otherwise we will keep creating coroutines until all
            // the input is consumed (or run out of memory, which is the case with large files).
            // NB: coroutines will also limit 'active' concurrency to the pool size.
            val maxCoroutines = processor.maxCoroutines()
            val maximumCoroutinesSemaphore = Semaphore(maxCoroutines)

            LOGGER.info("Threads: ${properties.ingestion.threads}, Max co-routines: $maxCoroutines")
            val readTime = measureTimeMillis {
                var checkpoint = System.currentTimeMillis()
                for (item in processor.parse(stream.buffered(10_000))) {
                    maximumCoroutinesSemaphore.acquire()

                    if (System.currentTimeMillis() - checkpoint > 60000) {
                        extendVisibility()
                        checkpoint = System.currentTimeMillis()
                    }

                    itemCounter++
                    launch(fixedPoolDispatcher) {
                        try {
                            // LOGGER.info("processing {}", item)
                            val (added, removed) = processor.process(item)
                            userCounter.increment()
                            addedCounter.add(added.toLong())
                            removedCounter.add(removed.toLong())
                        } catch (ex: Exception) {
                            // message already logged by processor
                            errorCounter.increment()
                            operationsErrorCounter.add(item.numberOfOperations.toLong())
                        } finally {
                            maximumCoroutinesSemaphore.release()
                        }
                    }
                }
            }
The
processor.process(item)
is the suspend function that eventually calls the http client. You can also see the semaphore that is used to limit how many coroutines are created at once.
s

Sam

10/05/2023, 9:37 AM
👍 I don't see any obvious problem with the error handling. Coroutines can sometimes hang if a cancellation exception is caught and suppressed, but I don't think that would happen here.
What's the
fixedPoolDispatcher
? Any chance it could be failing to dispatch some coroutines?
s

Szabolcs Póta

10/05/2023, 9:38 AM
Copy code
val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(defaultProperties.ingestion.threads)
val fixedPoolDispatcher = fixedThreadPool.asCoroutineDispatcher()
What I can think of is that maybe indeed something happens with the http client. E.g. under high load a response is lost so it never calls complete.
s

Sam

10/05/2023, 9:47 AM
Maybe you could guard against that by adding a timeout to each call, if you have an upper bound for how long you expect a request to reasonably take.
Based on what you've shared, I don't have any specific suggestions of where the problem could be. But I do have a suggestion for how you might alter the code. My preferred approach for limiting concurrency to some number n in a situation like this is to launch n coroutines and have them each loop over a single work queue (a Channel). By doing that you would reduce the number of moving parts and synchronization points, which should limit the number of places problems can hide.
s

Szabolcs Póta

10/05/2023, 9:53 AM
That's a good idea. I will also sniff around the http client. We have a readTimeout set so it should not wait forever, but who knows. Thanks for looking at it.
u

uli

10/05/2023, 11:58 AM
And definitely
ensureActive
in your catch block. As @Sam said, swallowing CancelationExceptions is not good, even if this might not be your issue. https://betterprogramming.pub/the-silent-killer-thats-crashing-your-coroutines-9171d1e8f79b
👍 1
>
Copy code
// message already logged by processor
Can you find out from your logs if any
CancelationExceptions
are being caught?
s

Szabolcs Póta

10/05/2023, 12:05 PM
Did a quick search. I couldn't find any.
u

uli

10/05/2023, 12:06 PM
And InterruptedException?
s

Szabolcs Póta

10/05/2023, 12:09 PM
I couldn't find any but just realised that the last hang happened so long ago that we don't have those logs any more. But I will look for them if it happens again.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:16 PM
There are no other blocked threads, everything else is idle so it looks the coroutines already finished inside the
runBlocking
block.
What about the threads in
fixedPoolDispatcher
? If they are not also blocked due to being parked, then what are they doing?
u

uli

10/05/2023, 12:17 PM
And a small race:
Copy code
launch(fixedPoolDispatcher) {
                        try {
...
                        } finally {
                            maximumCoroutinesSemaphore.release()
                        }
                    }
Here the launched coroutine can be canceled before it even starts. In that case, the try block will not even be entered and therefor the finally block won’t get executed. So maybe
launch(start=ATOMIC)
helps.
s

Sam

10/05/2023, 12:19 PM
Oh, nicely spotted!
onCompletion
(instead of
finally
) could be a solution to that as well, I think.
s

Szabolcs Póta

10/05/2023, 12:21 PM
Good point. Although if one coroutine skips the finally block it only means that instead of N only N-1 coroutines can be launched at once because one permit will never be released.
But good point on that it can be cancelled before starting. I am rewriting the semaphore part to use channels as @Sam suggested. Hopefully that will be safer.
👍 1
u

uli

10/05/2023, 12:22 PM
And over the weeks this might add up to N-N=0. Which would explain why the visible issue occures so rarely.
☝️ 1
s

Szabolcs Póta

10/05/2023, 12:23 PM
But if it was a semaphore issue I think we should see the a thread blocked in waiting for getting a permit. Anyway, hopefully with the channels it's less confusing.
@Dmitry Khalanskiy [JB] The pool threads are waiting for task. It seems to me they are idle. Here's one:
Copy code
"pool-3-thread-1" #19 prio=5 os_prio=0 cpu=80102322.38ms elapsed=7597007.90s tid=0x00007f276c8a7000 nid=0x7c4a waiting on condition  [0x00007f2718d5e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method)
	- parking to wait for  <0x00000000c0f07960> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.9.1/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.9.1/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.LinkedBlockingQueue.take(java.base@11.0.9.1/LinkedBlockingQueue.java:433)
	at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.9.1/ThreadPoolExecutor.java:1054)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1/ThreadPoolExecutor.java:1114)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@11.0.9.1/Thread.java:834)
All the threads has the same stack
u

uli

10/05/2023, 12:25 PM
Are you sure? This is (hopefully) not a thread semaphore but a coroutine semaphore. As such it would supsend and not block a thread
1
s

Szabolcs Póta

10/05/2023, 12:26 PM
Hm. That could be
All good points.
u

uli

10/05/2023, 12:26 PM
where are you importing Semaphore from?
s

Szabolcs Póta

10/05/2023, 12:27 PM
java.util.concurrent
u

uli

10/05/2023, 12:28 PM
I guess you should use kotlinx.coroutines.sync.Semaphore
💯 1
s

Szabolcs Póta

10/05/2023, 12:29 PM
Good to know that such thing exists, but for now I go with the channel solution. If I am not mistaken those should exit if something is cancelled.
u

uli

10/05/2023, 12:29 PM
I would not mix blocking and suspending. But I also think, it shoud not matter here
1
s

Szabolcs Póta

10/05/2023, 12:31 PM
Yes, old habit to go to the well known java.util classes.
s

Sam

10/05/2023, 12:33 PM
The use of the blocking Semaphore could certainly account for the problem here. That's a really good spot that I didn't even think to consider. It would gradually block the threads that are responsible for dispatching coroutines, meaning that eventually no more coroutines can be dispatched and the application will hang.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:36 PM
If Java's semaphore is used, then yes, if all of its tokens are gone, we would see the main thread hanging on
acquire
. According to the thread dump, this isn't what's happening, so this doesn't seem to be the issue. Also, I don't see a way for this code to realistically get cancelled so that children can't invoke the
finally
block.
s

Szabolcs Póta

10/05/2023, 12:37 PM
In this case shouldn't I see the runBlocking thread being parked at that line where it wants a permit? I've seen that in some stack traces.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:38 PM
Exactly my point, yes, you should.
s

Szabolcs Póta

10/05/2023, 12:39 PM
Ah, sorry haven't read your message carefully, that's indeed what you said.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:43 PM
We'll need to look into this. Is it possible for you to share your code with us so that we could try to reproduce the problem?
s

Szabolcs Póta

10/05/2023, 12:48 PM
The codebase is relatively big, unfortunately I am unable to share it. However I think the crucial parts are what I've already pasted, the runBlocking part and the bridge for the async HTTP client. There are no other coroutine specific code, just a bunch of interfaces and classes with suspend functions that eventually call the http client. I also tried to write a little simple code that does the same thing but it never hanged.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:50 PM
Here's the one possible explanation I can think of right now that doesn't involve a bug in `runBlocking`: very rarely (so rarely that it can't happen more than the semaphore's number of permits during a single
runBlocking
run), the
FutureCallback
doesn't receive any callbacks: neither
completed
, nor
cancelled
, nor
failed
. This matches the symptoms you're describing precisely: all of the threads dedicated to running coroutines would have nothing to do, and
runBlocking
would be parked.
I don't know anything about the HTTP library you're using, so I can't say how likely it is that no callbacks get received.
s

Szabolcs Póta

10/05/2023, 12:53 PM
It's the standard apache commons async httpclient. But I had this idea too, and maybe the reason I cannot see anything in the stack trace is because the httpclient is parked in a generic socket reading loop while the future it's supposed to call is not visible.
d

Dmitry Khalanskiy [JB]

10/05/2023, 12:57 PM
Here's a proposal:
Copy code
val (added, removed) = withTimeoutOrNull(10.minutes) {
    processor.process(item)
} ?: return@launch log("timed out a task")
If
process
does hang indefinitely for some reason, then this will cancel the waiting.
If even after introducing a timeout,
runBlocking
still hangs occasionally, please let us know: https://github.com/Kotlin/kotlinx.coroutines/issues So far, I'd stick to the assumption that the async callback simply never comes.
s

Szabolcs Póta

10/05/2023, 1:05 PM
Oh, that timeout is very elegant. This way I don't need to go down the http client route. I will try the channel and this and will see if this comes up again in prod. Thank you for all the good ideas.
10 Views