Can anyone recommend any resources that cover the ...
# coroutines
e
Can anyone recommend any resources that cover the internal implementation of courotines? I've been using suspendCoroutineUninterceptedOrReturn in my networking library. When I made it I looked into what suspending functions compiled down to, so I know that at least at some point in the past a suspend function was basically a function that takes a continuation and has a big switch statement over continuation.label which it uses to determine where it will resume from each time it gets called. Even if this is still true I don't fully understand how the continuation itself is created and how confined vs unconfined works etc. The actual issue I'm having is understanding where exceptions end up going since it seems to be based on what scope is being used or something along those lines.
e
1. a suspend function gets compiled into a state machine - so yes, a switch statement additional fields for local state. except in cases where that can be optimized out such as if there are no suspend points or they are only in tail position. see https://github.com/JetBrains/kotlin/blob/master/compiler/backend/src/org/jetbrains/kotlin/codegen/coroutines/coroutines-codegen.md for details 2. a coroutine can be dispatched - run by the dispatcher - or undispatched - executed in place, at least until the first suspension point. unconfined is just a special dispatcher that doesn't actually dispatch, and runs in place. 3. if you're talking about kotlinx.coroutines (structured concurrency) specifically and not just kotlin.coroutines (low level infrastructure for any kind of coroutine), then exceptions always bubble up to the root coroutine, which is created by launch(), async(), supervisorScope() 4. don't use suspendCoroutineUninterceptedOrReturn or other intrinsics unless you're trying to build your own coroutine system
e
So when it's dispatched the initial call is dispatched and resumes are dispatched? Undispatched only resumes are dispatched? Unconfined there is no dispatching at all? "then exceptions always bubble up to the root coroutine" meaning any sub routines that use the parents scope? eg: GlobalScope.launch { GlobalScope.launch { } } would both be root, but: GlobalScope.launch { launch {} } only the outer would be a root? Why not use suspendCoroutineUninterceptedOrReturn? If I know the code that will be resuming it won't resume twice or return value and then resume etc. I also would like to remain as close as possible to constant memory profile as physically possible.
e
resumes are always* dispatched to avoid blowing up the stack. no, the inner launch is also a root.
suspendCoroutine
is type-safe, unlike
suspendCoroutineUninterceptedOrReturn
.
e
Humm, why would the stack blow up unless you otherwise would have a StackOverflow? Can't you do like:
Copy code
suspend fun switchThreads() = suspendCoroutineUninterceptedOrReturn<Unit> {
    Thread({
        it.resumeWith(Result.success(Unit))
    }, "Example Thread").start()
    COROUTINE_SUSPENDED
}
fun main() = runBlocking {
    println(Thread.currentThread())
    switchThreads()
    println(Thread.currentThread())
}
To print:
Copy code
Thread[main,...]
Thread[Test,...]
e
suspend functions do not StackOverflow, even in situations where non-suspending functions would StackOverflow
to give a quick example,
Copy code
val fib = DeepRecursiveFunction { if (it < 2) it else callRecursive(it - 1) + callRecursive(it - 2) }
uses kotlin.coroutines (not concurrency nor kotlinx.coroutines) and does not StackOverflow
e
But would that functionality not work when dealing with Unconfined dispatcher and or bypassing intercepted resumes?
e
there's no use of any dispatchers in that example
e
"resumes are always* dispatched to avoid blowing up the stack." Then I don't really understand this? Ultimately what I want to take an ExecutorService and pass it to an async thread group, then I will resume a continuation from a CompletionHandler (which will be invoked on an executor service thread). Then when I write something like this:
Copy code
val provider = SocketProvider(executor)
val dispatcher = executor.asCoroutineDispatcher()
GlobalScope.launch(dispatcher) {
  while (provider.isOpen) {
    val client = provider.accept() //suspends
    launch(dispatcher) {
      while (client.isOpen) {
        val length = <http://client.read.int|client.read.int>() //suspends
        val bytes = client.read.bytes(length)
        println(bytes.toString(UTF_8))
      }
    }
  }
}
I want it to for example resume from int() on some executor thread... and call bytes on the same thread... rather than scheduling a new task when I resume from int() b/c I'm already on a thread pool thread. And I want to minimize overhead to the greatest possible degree since ofc the number of calls to something like int or bytes might be incredibly high. I've gone to great lengths to make sure that it has an almost constant memory overhead already.
n
resumes are always* dispatched to avoid blowing up the stack
DeepRecursiveFunction
avoids stack overflows because its implementation trampolines. There's nothing inherent to continuations that avoids stack overflow.
I want it to for example resume from int() on some executor thread... and call bytes on the same thread... rather than scheduling a new task when I resume from int() b/c I'm already on a thread pool thread.
All the built in Dispatchers either schedule the task or trampoline (even Unconfined may not resume directly) If you want to guarantee you don't schedule, then
Executor { it.run() }.asCoroutineDispatcher()
is an option but it's a very dangerous option. Personally, I'd prefer it over
suspendCoroutineUninterceptedOrReturn
since it's the caller aiming at their own foot rather than the implementation ignoring the caller's interceptor.
Kotlin's Coroutines have an implied contract that your code is always running where you told it to run.
I would expect this to work well enough:
Copy code
val provider = SocketProvider(executor)
val dispatcher = executor.asCoroutineDispatcher()
GlobalScope.launch(dispatcher) {
  while (provider.isOpen) {
    val client = provider.accept() //suspends
    withContext(Dispatchers.Unconfined) {
      while (client.isOpen) {
        //Assuming int always resumes from executor
        val length = <http://client.read.int|client.read.int>() //suspends
        val bytes = client.read.bytes(length)
        println(bytes.toString(UTF_8))
      }
    }
  }
}
e
The problem with this: Executor { it.run() }.asCoroutineDispatcher() is that it wouldn't be parallel anymore. Let me see if withContext(Unconfined) works well. I'll also try benching it vs unintercepted.