Also, if doing blocking processing in an `Applicat...
# ktor
d
Also, if doing blocking processing in an
ApplicationCall
context, what's the better practice, to use suspending functions,
withContext(CommonPool)
, or
async { }
(since some processing can be done in parallel..)?
c
the first one is better if you have a single blocking task that you need to wait for
with
async
you can launch multiple tasks
also
async
could be called in non-suspend function
d
It's safe to use
CommonPool
for this? Also, does each request have it's own coroutine running in a thread pool, so that if one request is held up by a blocking process, the other one would be handled?
c
Yes, no coroutines reused so using CommonPool for blocking tasks is safe: other non-blocking requests will be handled properly
d
If I would just use a
suspend fun
doing a single blocking process in a request, the other requests won't be blocked (or are they all running on the same thread... and I will need to do
withContext(CommonPool)
for it)?
c
withContext
launches a new child coroutine on a specificed context (separate thread pool) while the original one is suspended until the child will complete or crash
so child coroutines will concur each other but not request handler coroutines
d
Right, but if I would just use
suspend fun
w/o
withContext
, then no other requests are handled meanwhile? I haven't found anything in the docs about these points (in Vert.x they write it a few time DON'T BLOCK THE EVENT LOOP... same in Ktor?)
c
Yes but there are plans to fix it to allow users to block handler
this is why we have it unspecified in the docs
d
Meanwhile, it's very important to document...! Since people can make such mistakes (I almost did 🤕)
c
^^ @Deactivated User
d
A couple of questions: Is this blocking? https://github.com/ktorio/ktor-samples/blob/e9bd44f53dab4b0a45bf7b538d32af022325c2f9/app/youkube/src/Upload.kt#L58 Why is PartData.FileItem using InputStream instead of an Asynchronous channel? Other than that, going to update the documentation to reflect this. Thanks!
d
Ooops! I'm also doing that! Thanks for pointing it out @Deactivated User... It's also like that in the upload docs https://ktor.io/servers/uploads.html
How should I do it then?
c
Because for now it is not critical as input stream provided by
FileItem
is always
ByteArrayInputStream
or
FileInputStream
and never from the socket
d
But a very large file upload won't stop other requests from being processed?
c
yes, it could but there is no way to make reading a file asynchronous on JVM
the only we can do is to hide it as it is done in
AsynchronousFileChannel
d
Maybe internally it uses another thread I don’t know, but there is a signature using a completion handler for reading: https://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousFileChannel.html#read(java.nio.ByteBuffer,%20long,%20A,%20java.nio.channels.CompletionHandler) And in the case it doesn’t work, is it possible to hide the usage of the IO pool to the final user by wrapping it in an asynchronous stream using the IO pool under the hood? That way people won’t have to worry about blocking when handling uploads
d
What's the simplest way I could do it now in the meantime @Deactivated User?
c
@Deactivated User yes, JDK's implementation does IO on a separate thread pool that is quite slow
👍 1
d
I’m not completely sure, if this would work as expected @cy:
Copy code
async(ioCoroutineDispatcher) {
    part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyTo(it) } }
}.await()
Maybe it can be reworked to multiplex reading instead of reading the whole file in that thread at once.
Maybe increasing the read chunk size or doing buffering would improve performance and reduce the threadpool overhead? I was hoping that JVM would call asynchronous APIs from the OS to do that. Specially on SSDs or systems with several HDDs, that could benefit from it
Untested and unoptimized and with less throughtput but not blocking dispatchers with big files but blocking them temporarily with small chunk sizes:
Copy code
//part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyToSuspend(it) } }

suspend fun InputStream.copyToSuspend(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE, dispatcher: CoroutineDispatcher = ioCoroutineDispatcher): Long {
    var bytesCopied: Long = 0
    val buffer = ByteArray(bufferSize)
    while (true) {
        val bytes = withContext(dispatcher) { read(buffer) }
        if (bytes < 0) break
        withContext(dispatcher) { out.write(buffer, 0, bytes) }
        bytesCopied += bytes
    }
    return bytesCopied
}
d
What about using
withContext() {}
instead of
async { }.await()
, since you're in a
suspend fun
? Since anyways you await on each looping...
c
launching a new coroutine for every block would be very slow
d
Or a
produce { }
might be better?
c
well, you can try to launch a reading loop on a separate pool and use a channel or a byte channel to transfer bytes to the main handler coroutine
Copy code
val channel = writer(CommonPool) { 
    val buffer = ByteArray(4096)
    val stream = file.inputStream()
    while (true) {
        val rc = stream.read(buffer)
        if (rc == -1) break
        channel.writeFully(buffer, 0, rc)
    }
}

// here we have a channel that is asynchronous
d
Btw, I think
withContext
doesn't start a new coroutine, it just switches the coroutine context... so it could be used to surround the whole function, if you don't need to switch back and forth...
c
writer
is similar to
produce
but for byte channel
d
I’m not sure the best solution here. If it was asynchronous without threadpools in the first place, probably the overhead would be smaller. What I tried to do (but maybe I did it wrong) is that for example you have a io threadpool of 4 threads. And you have 8 uploads of 4 terabytes (that would take some time). Instead of blocking the threadpool with 4 of those tasks, I tried to process them all in parts. Maybe to reduce the overhead of switching, I can process several parts and reuse the coroutine of reading/writing. @cy In your snippet, the stream reading is still synchronous right?
c
Yes but it is running on a separate thread pool so request handler pool is not affected
👍 1
and consuming bytes from a byte channel is safe
d
So @cy, how would writing the uploaded file look using this (the
copyToSuspend
part)?
I need to be able to use something that doesn't block in my current project in the meantime.. I won't get to terrabytes, but this is a microservice that MUST be responding to other requests while uploading... maybe I should just put the original code in a
withContext(CommonPool)
until this is fixed?
Or maybe in
ioCoroutineDispatcher
?
Also, in the meantime, I think the docs also need to have some kind of temporary solution for others not to have surprises...
d
I’m interested in it too. I will update youkube sample (and uploads.html) too with the recommended way for doing this
👍🏼 1
d
@Deactivated User Also: https://ktor.io/servers/uploads.html, that's where I looked... I wouldn't have gone to Youkube unless there was nothing there...
👌 1
Please let me know what you did, I need to release this microservice soon... Thanks!
c
One should never block on
ioCoroutineDispatcher
! Unlike blocking in request handler coroutine, blocking on
ioCoroutineDispatcher
could cause infinite deadlock
d
Thanks for the warning @cy! If there are dispatchers that may be used by Ktor end-users it might be nice to have some docs on them too... instead of creating pools that might have been made for the purpose, or mistakingly using pools not for that purpose..
other functions you can use to create your pool:
newFixedThreadPoolContext
and
ExecutorService.asCoroutineDispatcher()
d
Ok, so there aren't any interesting dispatchers to reuse.. I'll do that, I suppose that for IO it's probably better than just using
CommonPool
...
d
For the copyTo part:
Copy code
//part.streamProvider().use { its -> file.outputStream().buffered().use { its.copyToSuspend(it) } }

suspend fun InputStream.copyToSuspend(
    out: OutputStream,
    bufferSize: Int = DEFAULT_BUFFER_SIZE,
    yieldSize: Int = 4 * 1024 * 1024,
    dispatcher: CoroutineDispatcher = ioCoroutineDispatcher
): Long {
    return withContext(dispatcher) {
        val buffer = ByteArray(bufferSize)
        var bytesCopied = 0L
        var bytesAfterYield = 0L
        while (true) {
            val bytes = read(buffer).takeIf { it >= 0 } ?: break
            out.write(buffer, 0, bytes)
            if (bytesAfterYield >= yieldSize) {
                yield()
                bytesAfterYield %= yieldSize
            }
            bytesCopied += bytes
            bytesAfterYield += bytes
        }
        return@withContext bytesCopied
    }
}
👌🏼 1