nilTheDev
09/03/2021, 12:22 PMawait
extension function is wrapping the inherently blocking call inside it to make it work with coroutines. As far as my understanding goes the enqueue
method submit the call into some kind of thread pool that waits in the queue until executed.
Question 1: If I throw 100_000
requests asynchronously what would be the mechanisms that would prevent the program from being crashed? Would that only be the queue that the enqueue
method submit to? Or there are some other mechanisms that suspendCoroutine
provides?
Question 2: What exactly is the role that suspendCoroutine
plays here? Since the extension function Call<T>.await()
is a suspending function why can't it just use the continuation that this function would receive to write the callback? There must be some extra functionality that suspendCoroutine
provides other than access to the continuation. What exactly is that?Zach Klippenstein (he/him) [MOD]
09/03/2021, 4:11 PMnilTheDev
09/03/2021, 4:13 PMelizarov
09/04/2021, 10:41 AMelizarov
09/04/2021, 10:43 AMenqueue
API does under load is up to the authors of that API. I assume it just puts them into the queue (as the name suggests) and does not crash even if you do 100k requests.elizarov
09/04/2021, 10:44 AMsuspendCoroutine
is bridge between callback-based API and suspending API. That’s how you turn a suspending function call into a call-back based one.elizarov
09/04/2021, 10:45 AMnilTheDev
09/05/2021, 7:35 PMQ1
when I said "inherently blocking" what I meant is not that it would block the main thread of execution. But it would still be sitting in some background thread until the response arrives. As far as my understanding goes there would probably be no way to do a network call without blocking any threads unless we can pass a callback to the operating system itself. But still as there is a queue, in the retrofit API, presumably, 100k calls won't crash the program.
I have had a few more question related to coroutines. Since you are the one who is answering those I should probably ask those right now because who knows whether I would get another opportunity ever.
First, consider this code,
class MyNaiveFuture<T>(val block: () -> T) {
fun execute(callback: (T) -> Unit) {
// simply spawning a new thread instead of
// using a thread pool for the sake of simplicity
thread {
val value = block()
// invoking the callback that was
// passed to the function
callback(value)
}
}
}
// a cpu-intensive code that returns the future type
fun inEfficientPrimeCount(limit: Int): MyNaiveFuture<Int> {
val block = {
var primeCount = 0
for (i in 3..limit) {
var flag = true
for (j in 2..sqrt(i.toDouble()).toInt()) {
if (i % j == 0) {
flag = false
break
}
}
}
primeCount
}
return MyNaiveFuture(block)
}
To convert the above code into suspend functions we can do something like this,
suspend fun suspendablePrimeCount(limit: Int): Int = inEfficientPrimeCount(limit).await()
suspend fun <T> MyNaiveFuture<T>.await(): T = suspendCoroutine { cont ->
execute { result ->
cont.resume(result)
}
}
Once compiled the MyNaiveFuture<T>await()
function signature would be converted to something like this,
fun <T> MyNaiveFuture<T>.await(cont: Continuation<T>)
In my second question I was asking that why can't we just use this Continuation<T>
instead of using the one that suspendCoroutine
would provide us.
Something like that,
fun <T> MyNaiveFuture<T>.await(cont: Continuation<T>){
execute { result ->
cont.resume(result)
}
}
I think we have to use suspendCoroutine
because the Continuation<T>
in the MyNaiveFuture<T>.await()
function won't be available to us until the code is compiled. So, the compiler won't allow us to use Continuation<T>
directly in the function. And suspendCoroutine
does some trick under the hood to provide us with the Continuation
in the runtime.
With continuation to the previous snippets consider this code,
fun main() = runBlocking {
val deferred = async {
suspendablePrimeCount(10000000)
}
Thread.sleep(10000)
println("the result of the heavy computation ${deferred.await()}")
}
There would be four four functions call involved in the async
block before the heavy computation get fired in a different thread. suspendablePrimeCount()
-> MyNaiveFuture<T>.await()
-> suspendCoroutine()
-> MyNaiveFuture<T>.execute()
. Parameters are omitted for simplicity.
However, in the code, the async
block won't even get a chance to fire the process before the Thread.sleep()
finishes its blocking execution. Unless the chain hits the MyNaiveFuture<T>.execute()
no other thread will be spawned.
However, if we write the code slightly differently the async
block would be able to fire the process before the main thread hits the Thread.sleep()
call.nilTheDev
09/05/2021, 7:35 PMfun main() = runBlocking {
val deferred = async {
suspendablePrimeCount(10000000)
}
yield() // this will allow the async block to fire the process
Thread.sleep(10000)
println("the result of the heavy computation ${deferred.await()}")
}
In this way, the heavy computation will be fired on a different thread before the the blocking sleep (Thread.sleep()
) starts. So, the process will be run concurrently on a different thread while the main thread is blocked by Thread.sleep()
.
That brings me to the next question,
Question 3
: There must be some mechanism that oversees the execution of the main thread, Some static place where the code in launch
or async
block gets hooked. And whenever the main thread gets free, mostly because some other execution has been suspended or it's idle, the overseer fires the execution of launch
or async
. Can you shed some more light on whether this type of overseer truly exist or there are some other mechanism(s)?
The last question would be on the delay
function. Its internal workings has always been a mystery to me. Consider this code,
fun main() = runBlocking {
println("delay started")
delay(5000)
println("delay ended")
}
Question 4:
How does the delay
function actually work? Neither it would block the thread nor would it spawn a new thread. It would just suspend the execution in a non-blocking way and resume after the delay is completed. How it manages to do so? A sneak peak on the source tells us that it uses suspendCancellableCoroutine
to start a new coroutine in which it calls the scheduleResumeAfterDelay
function. How it schedules stuffs on the main thread? Who is actually responsible to notify that delay has been completed? Is it the JVM
? Or some overseer that might exist? Does Kotlin turn the main thread into a queue with a scheduler? Or it has always been a queue?
I didn't have any prior experience with asynchronous programming. Kotlin coroutines is the implementation through which I am being introduced with it. That has probably made the learning curve much steeper. Picking up coroutines would probably be easy for those who have gone through the agony of writing callbacks for years. But for beginners it is still somewhat difficult.
Threads in Java may not be a robust solution. But understanding Java threads is simple because it maps to a OS
level thread. Kotlin coroutine has some complex abstraction on the user level. That's why understanding its mapping with the OS
level thread or the thread pool pattern is challenging.gildor
09/06/2021, 8:33 AMHow does theIt depends on dispatcher implementation. Usually you have some mechanism to start some action using with delay, for example on Android Main dispatcher it Handler delay feature, on JVM it has own event loop in default dispatcherfunction actually work?delay