https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

nilTheDev

09/03/2021, 12:22 PM
This is a talk Roman Elizarov gave back in 2017 that explains coroutine's internals. In this particular snippet, apparently, that
await
extension function is wrapping the inherently blocking call inside it to make it work with coroutines. As far as my understanding goes the
enqueue
method submit the call into some kind of thread pool that waits in the queue until executed. Question 1: If I throw
100_000
requests asynchronously what would be the mechanisms that would prevent the program from being crashed? Would that only be the queue that the
enqueue
method submit to? Or there are some other mechanisms that
suspendCoroutine
provides? Question 2: What exactly is the role that
suspendCoroutine
plays here? Since the extension function
Call<T>.await()
is a suspending function why can't it just use the continuation that this function would receive to write the callback? There must be some extra functionality that
suspendCoroutine
provides other than access to the continuation. What exactly is that?
z

Zach Klippenstein (he/him) [MOD]

09/03/2021, 4:11 PM
The best way to answer that is probably for you to just look at the source and see what it's actually doing.
n

nilTheDev

09/03/2021, 4:13 PM
@Zach Klippenstein (he/him) [MOD] Tried doing that but it is super challenging to understand what's going on. So trying to find out if somebody has already gone through the pain to decipher the source. But no luck.
e

elizarov

09/04/2021, 10:41 AM
“is wrapping the inherently blocking call”. No. That’s is not what’s going going on here. There is a non-blocking callback-based API and this code is converting this non-blocking callback-based API into a suspending function.
Q1. is not related to coroutine. What exactly
enqueue
API does under load is up to the authors of that API. I assume it just puts them into the queue (as the name suggests) and does not crash even if you do 100k requests.
Q2.
suspendCoroutine
is bridge between callback-based API and suspending API. That’s how you turn a suspending function call into a call-back based one.
I actually do recommend watching a different talk of mine that goes over different ways of doing asynchronous (non-blocking) programming and how they relate to each other. It will help you if you had some prior experience with any kind of non-blocking programming before.

https://www.youtube.com/watch?v=hb0hfHVWCS0

💙 4
n

nilTheDev

09/05/2021, 7:35 PM
@elizarov Watched the talk that you recommended. As I have already watched two of your talks namely "Introduction to coroutines" and "Deep dive into coroutines", this talk(Fresh Async...) didn't introduced a lot of new concepts that has not already been covered in those previous talks. In
Q1
when I said "inherently blocking" what I meant is not that it would block the main thread of execution. But it would still be sitting in some background thread until the response arrives. As far as my understanding goes there would probably be no way to do a network call without blocking any threads unless we can pass a callback to the operating system itself. But still as there is a queue, in the retrofit API, presumably, 100k calls won't crash the program. I have had a few more question related to coroutines. Since you are the one who is answering those I should probably ask those right now because who knows whether I would get another opportunity ever. First, consider this code,
Copy code
class MyNaiveFuture<T>(val block: () -> T) {
    fun execute(callback: (T) -> Unit) {
        
        // simply spawning a new thread instead of 
        // using a thread pool for the sake of simplicity
        
        thread {
            val value = block()
        
            // invoking the callback that was
            // passed to the function
            callback(value)
        }
    }
}


// a cpu-intensive code that returns the future type
fun inEfficientPrimeCount(limit: Int): MyNaiveFuture<Int> {
    val block = {
        var primeCount = 0

        for (i in 3..limit) {
            var flag = true
            for (j in 2..sqrt(i.toDouble()).toInt()) {
                if (i % j == 0) {
                    flag = false
                    break
                }
            }
        }
        primeCount
    }
    return MyNaiveFuture(block)
}
To convert the above code into suspend functions we can do something like this,
Copy code
suspend fun suspendablePrimeCount(limit: Int): Int = inEfficientPrimeCount(limit).await()

suspend fun <T> MyNaiveFuture<T>.await(): T = suspendCoroutine { cont ->
    execute { result ->
        cont.resume(result)
    }
}
Once compiled the
MyNaiveFuture<T>await()
function signature would be converted to something like this,
Copy code
fun <T> MyNaiveFuture<T>.await(cont: Continuation<T>)
In my second question I was asking that why can't we just use this
Continuation<T>
instead of using the one that
suspendCoroutine
would provide us. Something like that,
Copy code
fun <T> MyNaiveFuture<T>.await(cont: Continuation<T>){
    execute { result ->
        cont.resume(result)
    }
}
I think we have to use
suspendCoroutine
because the
Continuation<T>
in the
MyNaiveFuture<T>.await()
function won't be available to us until the code is compiled. So, the compiler won't allow us to use
Continuation<T>
directly in the function. And
suspendCoroutine
does some trick under the hood to provide us with the
Continuation
in the runtime. With continuation to the previous snippets consider this code,
Copy code
fun main() = runBlocking {

    val deferred = async {
        suspendablePrimeCount(10000000)
    }
    
    Thread.sleep(10000)

    println("the result of the heavy computation ${deferred.await()}")
    
}
There would be four four functions call involved in the
async
block before the heavy computation get fired in a different thread.
suspendablePrimeCount()
->
MyNaiveFuture<T>.await()
->
suspendCoroutine()
->
MyNaiveFuture<T>.execute()
. Parameters are omitted for simplicity. However, in the code, the
async
block won't even get a chance to fire the process before the
Thread.sleep()
finishes its blocking execution. Unless the chain hits the
MyNaiveFuture<T>.execute()
no other thread will be spawned. However, if we write the code slightly differently the
async
block would be able to fire the process before the main thread hits the
Thread.sleep()
call.
Copy code
fun main() = runBlocking {

    val deferred = async {
        suspendablePrimeCount(10000000)
    }

    yield() // this will allow the async block to fire the process 
    
    Thread.sleep(10000)

    println("the result of the heavy computation ${deferred.await()}")
    
}
In this way, the heavy computation will be fired on a different thread before the the blocking sleep (
Thread.sleep()
) starts. So, the process will be run concurrently on a different thread while the main thread is blocked by
Thread.sleep()
. That brings me to the next question,
Question 3
: There must be some mechanism that oversees the execution of the main thread, Some static place where the code in
launch
or
async
block gets hooked. And whenever the main thread gets free, mostly because some other execution has been suspended or it's idle, the overseer fires the execution of
launch
or
async
. Can you shed some more light on whether this type of overseer truly exist or there are some other mechanism(s)? The last question would be on the
delay
function. Its internal workings has always been a mystery to me. Consider this code,
Copy code
fun main() = runBlocking {
    println("delay started")
    delay(5000)
    println("delay ended")
}
Question 4:
How does the
delay
function actually work? Neither it would block the thread nor would it spawn a new thread. It would just suspend the execution in a non-blocking way and resume after the delay is completed. How it manages to do so? A sneak peak on the source tells us that it uses
suspendCancellableCoroutine
to start a new coroutine in which it calls the
scheduleResumeAfterDelay
function. How it schedules stuffs on the main thread? Who is actually responsible to notify that delay has been completed? Is it the
JVM
? Or some overseer that might exist? Does Kotlin turn the main thread into a queue with a scheduler? Or it has always been a queue? I didn't have any prior experience with asynchronous programming. Kotlin coroutines is the implementation through which I am being introduced with it. That has probably made the learning curve much steeper. Picking up coroutines would probably be easy for those who have gone through the agony of writing callbacks for years. But for beginners it is still somewhat difficult. Threads in Java may not be a robust solution. But understanding Java threads is simple because it maps to a
OS
level thread. Kotlin coroutine has some complex abstraction on the user level. That's why understanding its mapping with the
OS
level thread or the thread pool pattern is challenging.
g

gildor

09/06/2021, 8:33 AM
How does the 
delay
 function actually work?
It depends on dispatcher implementation. Usually you have some mechanism to start some action using with delay, for example on Android Main dispatcher it Handler delay feature, on JVM it has own event loop in default dispatcher
8 Views