Axel
03/30/2023, 7:49 AMprivate fun NormalOpenAPIRoute.getAssetById(articleResource: ArticleResource) {
route("/content/v1/assets/{id}") {
get<GetAssetByIdParams, OutputStream>(info("Get an Asset (Media) by its id"))
{
val src = articleResource.getAssetById(it.id)
this.pipeline.call.respondOutputStream(
contentType = src.contentType,
status = HttpStatusCode.OK,
contentLength = src.contentLength
) {
src.content.copyTo(this)
}
}
}
}
The Http client that invokes the downstream service:
class ArticleClient(
private val httpClientSupplier: HttpClientSupplier,
) : ArticleRepository {
override suspend fun getAssetById(id: UUID): Either<ArticleRepository.ArticleRepositoryError, Asset> = either {
val response = httpClientSupplier.executeGetCall( // This wraps HttpClient.get() in a Either.catch
AssetExternalV1.AssetById(id = id),
log,
) { ArticleRepository.ArticleRepositoryError.UnexpectedError }.bind()
when (response.status) {
OK -> response.toAsset().right()
else -> ArticleRepository.ArticleRepositoryError.UnexpectedError.left()
}.bind()
}
}
private suspend fun HttpResponse.toAsset() = Asset(
contentType = this.contentType()!!,
contentLength = this.contentLength()!!,
content = this.bodyAsChannel()
)
Can someone see what I’m doing wrong?Riccardo Lippolis
04/03/2023, 7:34 AMInputStream.copyTo
function is a blocking IO call (not suspending). So when the src.content.copyTo(this)
statement is called, it will block until the complete asset has been written to the outputstream. The fact that it blocks there is probably causing Ktor to wait with flushing the outputstream until the operation is completed.Axel
04/03/2023, 7:58 AMsrc.content
in this case is a io.ktor.utils.io.ByteReadChannel
. That class is documented as:
Channel for asynchronous reading of sequences of bytes. This is a single-reader channel.
Operations on this channel cannot be invoked concurrently.
But the method is documented:
Reads bytes from receiver channel and writes them to dst channel. Closes dst channel if fails to read or write with cause exception.
Returns:
a number of copied bytes
Bit confusing if its async or notRiccardo Lippolis
04/03/2023, 8:01 AMRiccardo Lippolis
04/03/2023, 8:06 AMByteReadChannel
as a server response?Axel
04/03/2023, 8:06 AMAleksei Tirman [JB]
04/03/2023, 9:09 AMHttpClient.get
method call saves the whole response body into memory. Here is a simple example of how to stream a response body:
val url = "<https://releases.ubuntu.com/22.04.2/ubuntu-22.04.2-desktop-amd64.iso>"
val client = HttpClient(CIO)
embeddedServer(Netty, port = 3333, host = "0.0.0.0") {
routing {
get("/") {
client.prepareGet(url).execute { response ->
val inputChannel = response.bodyAsChannel()
call.respondBytesWriter {
inputChannel.copyTo(this)
}
}
}
}
}.start(wait = true)
Axel
04/03/2023, 9:11 AMprepareGet().execute()
instead of get()
on the HttpClient
because it is non-blocking? 🙂Aleksei Tirman [JB]
04/03/2023, 9:25 AMprepareGet().execute()
because in that case a response body won’t be saved into a memory.