At Airbnb, we use Kotlin and coroutines in some of...
# coroutines
a
At Airbnb, we use Kotlin and coroutines in some of our server-side code. The workload for each request is fully async — there’s no blocking code that runs during the life of the request. For the most part, things work just fine, as you’d expect. However, as we load test these services and bring them to a breaking point, we see latency sky rocket, queuing start to occur at the web server layer (we use Dropwizard and non-blocking jetty APIs, so under normal operation, there is absolutely no queuing), and eventually heap exhaustion as all of this work is queued and requests never complete. Now of course, some of this behavior is expected — every service has its breaking point 🙂. In our Java services, however, which has a more mature service framework, we use standard patterns such as CoDel (controlled delay, https://en.wikipedia.org/wiki/CoDel), adaptive queuing, etc, to perform some load shedding to ensure that, even under heavy load, while the service may degrade, it never gets into an unrecoverable state. This has worked well for us, and I’d like to do similar things in our Kotlin services that use Coroutines. Does anyone have any insight into how to apply a load shedding algorithm (bulkheading, controlled delay, etc) for coroutines, while still leveraging the default work-stealing dispatcher?
CoroutineScheduler
doesn’t expose any public APIs to get it’s current “state”, meaning that there’s no way to really get much insight into the state of the default dispatcher. TL;DR — I want to figure out how to structure my application such that, when we’re saturating the CPU (and thus all the default dispatcher threads are super busy doing “stuff”), I can start to throw some back pressure and reject some requests, rather than continuing to accept hundreds/thousands of requests and causing the server to completely become unresponsive. And notably, I want to be able to cause this behavior with a mechanism that doesn’t require a ton of tuning per-service — I don’t want to do this strictly with a “request rate limit” or something like that. I’d like it to be much more adaptive, ideally.
l
Do you have any idea regarding the API surface you would want to use for your use case?
a
I think probably I’d want some insight into the queue size inside CoroutineScheduler and whether or not it is “filling up” (e.g., look at how quickly things are leaving the queue)
but honestly i’m not sure
open to ideas
l
What would be the unit of "how quickly things are leaving the queue"?
I think you could make your own
CoroutineDispatcher
wrapping
Dispatchers.Default
and adding enter/exit listeners (with `try`/`finally`). With that, you could experiment with the API and its usage and report back on how it's going, so we can see if that could be a great addition or inspiration for first-party support into kotlinx.coroutines
a
if we wanted to implement something similar to the CoDel mechanism I was mentioning, then you’d basically have two timeouts of how long something would live in the queue before actually running — one timeout which is the steady state timeout, and one value which is the “overloaded” timeout…e.g, after you know that you’re starting to be overloaded (because things aren’t being taken off the queue within the time specified in the steady state timeout), you would make the timeout more stringent, and cancel tasks that take longer than this to schedule, so as to lower the size of the queue and shed the load.
So I actually thought about this dispatcher approach
when you say try/finally, you mean around the runnable itself right
like wrap the runnable
to know when it started and ended
?
l
Yes, but now that I read about your use case, I don't think you want to do it at the dispatcher level, that's too low level and not running queued runnables in the default dispatcher might/would/will cause issues that'd prevent proper operation of the program.
How is the machine overloaded? RAM because of too many in flight tasks to run, or CPU because computations happening on that machine's CPU, or blocking I/O like file read/write (which should run on
<http://Dispatchers.IO|Dispatchers.IO>
), or something else?
a
not blocking I/O — there’s no blocking IO for these services. CPU happens first — we can saturate the CPU with X number of requests. As the CPU is saturated, as you would expect, we see higher latency. We run in Kubernetes, so eventually the CPU starts to get heavily throttled. As the CPU starts to get throttled, the service starts really misbehaving, and requests start taking longer and longer. The service uses Y amount of heap per request, so if you end up in a place where there’s a ton of inflight requests that are never completing, then you end up, eventually, with heap exhaustion.
this isn’t terribly unique to our services of course — I think this pattern is very common with most types of services with workloads that are more CPU bound.
the absolute simplest approach, of course, is to just basically rate limit the number of requests coming into the service to some certain level
but this has obvious downsides
and in the case of the server that I have in mind — it’s a graphql server, and thus each request is considerably different from the next. One query might fetch 10 fields, another query might fetch 1000 fields and do some business logic.
l
I understand why you want to limit the requests dynamically instead of setting a magic number.
a
yep
(just being explicit 😛 haha)
l
Or verbose? 😜
a
that too
🙂
l
😁
I think you need a wrapper for these requests, so it's agnostic to the number of suspension points. You can have it hold a pair of these timeouts if it depends on the work requested. And you have a cancellable coroutine that'd wait for the first timeout to pass. If the work finishes first, it's cancelled while suspended on
delay(…)
, otherwise, it signals the work is slow, so other requests can be throttled or what needs to be done is done to avoid making the situation worse. After signaling this, the work is still allowed to run until the second timeout (mind the subtraction with first timeout if needed). If the second delay completes without being cancelled from that work completion, then it cancels the work (and also sends some signal to somewhere if needed?).
a
yah, this makes sense. I think one worry I have with this is that it’s possible a request could take a long time not because the service is overloaded, but because some downstream service has elevated latency
l
Here's an example if you were to launch these in a fire and forget fashion (except knowing the work took longer than expected). You can make a suspending version that could wrap only the parts involving CPU work and no external service calls. You could also wrap CPU operations differently from external service operations. And you could throw or return whatever you want in the coroutine racing for the expected execution time window + allowed extra time.
BTW,
raceOf
is from my open source library #splitties, you can find it documented here: https://github.com/LouisCAD/Splitties/tree/main/modules/coroutines#racing-coroutines
e
Please, drop an issue to http://github.com/kotlin/kotlinx.coroutines/issues with your use-case. Internally, Default dispatcher implementation actually knows how long request have been waiting in the queue (because it keeps a timestamp of each task for its own needs), so it could expose this information in some sort of API. In the near future we plan to work on the design of the API for “sliceable” dispatchers and this new API might be a good entry point to also expose additional “dispatcher load” information to the outside world.
j
I'd recommend exposing some telemetry from your server and start looking at the metrics instead of making assumptions and drawing conclusions about where you think the bottlenecks are. It might be as simple as spinning up more servers. If you are not maxing out the CPU there are usually two explanations: 1) you are blocking on something. I once profiled a server and found our DNS lookups were taking hundreds of ms and weren't cached. Solution: use a cloud provided dns. 2) you are saturating disk or network IO somehow. Bottom line is profile and measure where it is spending time before you start tinkering with your architecture. Maybe do some load and stress testing and autoscale when you hit certain connection thresholds.
k
I'd also ask whether you should be using k8s or some service mesh health monitoring so it doesn't send the traffic to services that are slowing down. You can expose queue length or response time as a health metric. I'd echo what Jilles said as well.
a
@Jilles van Gurp @kenkyee thanks for your thoughts here. Rest assured that a) we’ve been doing a bunch of load and stress testing to figure out where the limits are and b) we do use Kubernetes and a service mesh with Envoy to be able to do some of this in the mesh layer. We’re moving to Istio at Airbnb over the course of the next year or so, and as we do that it unlocks a few more advanced envoy features that can help us with this problem and move it out of the application for sure.
@louiscad thanks for your thoughts! This approach inspired me to do more research on this topic specifically around measuring latency and inflight requests, and I stumbled upon this netflix blog post: https://medium.com/@NetflixTechBlog/performance-under-load-3e6fa9a60581, and this repo: https://github.com/Netflix/concurrency-limits. I implemented this in a service last night, and it’s pretty successful. At a high level, after thinking about it, I think I like this approach better than anything that ties deeper into the dispatcher. While I do think it would be good to expose an API from the dispatcher to be able to tell if its healthy (and perhaps use that as input to load shedding), I think leveraging latency of a particular workload as a primary input to a resilience mechanism makes a ton of sense.
👌 1
@elizarov i’d love to know more about this “sliceable” dispatchers concept! Is there anything written publicly about this yet?