Hello! I have a Ktor application that will expose...
# getting-started
a
Hello! I have a Ktor application that will expose an endpoint to fetch assets (images). When a client requests an image (by ID) ktor makes a http request to a downstream service (behind VPC) to fetch the asset, and then stream the binary result back to the client. I think my implementation is not working properly. If I request the asset in Google Chrome directly from the downstream service, it takes ~300ms. If I make the same request via my ktor application it takes ~1sec. This makes me think it’s not streaming the result, but rather downloading the entire asset in ktor before streaming it to the client. The added latency is too much. This is my code: The Ktor endpoint:
Copy code
private fun NormalOpenAPIRoute.getAssetById(articleResource: ArticleResource) {
    route("/content/v1/assets/{id}") {
        get<GetAssetByIdParams, OutputStream>(info("Get an Asset (Media) by its id"))
        {
            val src = articleResource.getAssetById(it.id)

            this.pipeline.call.respondOutputStream(
                contentType = src.contentType,
                status = HttpStatusCode.OK,
                contentLength = src.contentLength
            ) {
                src.content.copyTo(this)
            }
        }
    }
}
The Http client that invokes the downstream service:
Copy code
class ArticleClient(
    private val httpClientSupplier: HttpClientSupplier,
) : ArticleRepository {
    override suspend fun getAssetById(id: UUID): Either<ArticleRepository.ArticleRepositoryError, Asset> = either {
        val response = httpClientSupplier.executeGetCall( // This wraps HttpClient.get() in a Either.catch
            AssetExternalV1.AssetById(id = id),
            log,
        ) { ArticleRepository.ArticleRepositoryError.UnexpectedError }.bind()

        when (response.status) {
            OK -> response.toAsset().right()
            else -> ArticleRepository.ArticleRepositoryError.UnexpectedError.left()
        }.bind()
    }
}

private suspend fun HttpResponse.toAsset() = Asset(
    contentType = this.contentType()!!,
    contentLength = this.contentLength()!!,
    content = this.bodyAsChannel()
)
Can someone see what I’m doing wrong?
r
I'm not a Ktor expert, but I do know that the
InputStream.copyTo
function is a blocking IO call (not suspending). So when the
src.content.copyTo(this)
statement is called, it will block until the complete asset has been written to the outputstream. The fact that it blocks there is probably causing Ktor to wait with flushing the outputstream until the operation is completed.
a
Ah interesting.
src.content
in this case is a
io.ktor.utils.io.ByteReadChannel
. That class is documented as:
Copy code
Channel for asynchronous reading of sequences of bytes. This is a single-reader channel.
Operations on this channel cannot be invoked concurrently.
But the method is documented:
Copy code
Reads bytes from receiver channel and writes them to dst channel. Closes dst channel if fails to read or write with cause exception.
Returns:
a number of copied bytes
Bit confusing if its async or not
r
ah apologies, if it's not an inputstream, disregard what I said. It does appear to be a suspending function: https://api.ktor.io/ktor-io/io.ktor.utils.io.jvm.nio/index.html#-484599876%2FFunctions%2F1740649208
you could try asking for assistance in #ktor with how to stream a
ByteReadChannel
as a server response?
a
Yes 🙂
a
Besides the problem described by @Riccardo Lippolis about blocking IO, you have another one where the
HttpClient.get
method call saves the whole response body into memory. Here is a simple example of how to stream a response body:
Copy code
val url = "<https://releases.ubuntu.com/22.04.2/ubuntu-22.04.2-desktop-amd64.iso>"
val client = HttpClient(CIO)

embeddedServer(Netty, port = 3333, host = "0.0.0.0") {
    routing {
        get("/") {
            client.prepareGet(url).execute { response ->
                val inputChannel = response.bodyAsChannel()

                call.respondBytesWriter {
                    inputChannel.copyTo(this)
                }
            }
        }
    }
}.start(wait = true)
a
Nice, so to summarise I should be using
prepareGet().execute()
instead of
get()
on the
HttpClient
because it is non-blocking? 🙂
a
You need to use
prepareGet().execute()
because in that case a response body won’t be saved into a memory.
💡 1