How can I pass to ktor client a `Flow<ByteArray>` ...
# ktor
a
How can I pass to ktor client a
Flow<ByteArray>
using MultiPartFormDataContent so that data are sent progressively without allocating a lot of memory? I tried the following but it works if I use runBlocking inside append's bodyBuilder which does not sent the data progressively. If I use launch it does not send the flow.
Copy code
import io.ktor.client.*
import io.ktor.client.request.forms.*
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.testing.*
import io.ktor.utils.io.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.io.readByteArray
import kotlin.test.Test

class UploadTest {
    val BUFFER_SIZE: Long = 64 * 1024

    @Test
    fun testUpload() = testApplication {
        application {
            routing {
                contentType(ContentType.MultiPart.FormData) {
                    post("upload") {
                        printUpload(
                            call.receiveMultipart()
                        )
                        call.respond(HttpStatusCode.OK)
                    }
                }
            }
        }
        upload(client, dataFlow())
    }

    fun dataFlow(): Flow<ByteArray> {
        var bytes = 0
        var counter = 0
        val chunk = "01".repeat(BUFFER_SIZE.toInt() / 2).toByteArray()
        return flow {
            repeat(9) {
                bytes += chunk.size
                println("${++counter}: $bytes bytes sent")
                emit(chunk)
            }
        }
    }

    suspend fun upload(client: HttpClient, data: Flow<ByteArray>) = coroutineScope {
        client.submitFormWithBinaryData(
            url = "/upload", formData = formData {
                append("content", Headers.build {
                    append(HttpHeaders.ContentType, "application/octet-stream")
                    append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
                }) {
                    launch {
                        data.collect {
                            write(it)
                        }
                    }
                }
            })
    }

    suspend fun printUpload(
        multipartData: MultiPartData
    ) {
        multipartData.forEachPart { part ->
            when (part) {
                is PartData.FileItem -> {
                    val fileName = part.originalFileName ?: "data.dat"
                    println("$fileName: ")
                    var counter = 0
                    var bytes = 0
                    part.provider().asByteArrayFlow().collect {
                        bytes += it.size
                        println("${++counter}: $bytes bytes uploaded")
                    }
                }

                else -> null
            }
            part.dispose()
        }
    }

    fun ByteReadChannel.asByteArrayFlow(): Flow<ByteArray> = flow {
        var counter = 0L
        var bytes = 0L
        while (!isClosedForRead) {
            val packet = readRemaining(BUFFER_SIZE)
            while (!packet.exhausted()) {
                val chunk = packet.readByteArray()
                bytes += chunk.size
                println("${++counter}: $bytes bytes received")
                emit(chunk)
            }
        }
    }

}
🧵 4
a
So, do you need to stream the multipart/form-data content in the request? Can you describe your more general problem in more detail?
a
I have an interface like that
Copy code
interface FileService {

	suspend fun uploadFile(
        type: String,
        name: String,
        content: Flow<ByteArray>,
        fileSize: Long? = null,
        mediaType: String? = null,
        metadata: Map<String, String>? = null,
    ): FileInfo

    fun downloadFile(
        type: String,
        name: String
    ): Flow<ByteArray>
    
    // etc
    
}
and I have separate libraries with different implementations. One uses azure blob storage sdk, other amazon s3 sdk and I need also one that talks to a microservice to achive the same functionality. In this library I use ktor client and I want to stream upload and download because files can be very large and I want to run the service in less memory than a file can be Is it possible to achieve streaming with multipart/form-data ?
For download I use bodyAsChannel and with the following I convert it to Flow<ByteArray>:
Copy code
fun ByteReadChannel.asByteArrayFlow(length: Long? = null): Flow<ByteArray> = flow {
    var count = 0L
    while (!isClosedForRead) {
        val packet = readRemaining(BUFFER_SIZE.toLong())
        while (!packet.exhausted()) {
            val bytes = packet.readByteArray()
            count += bytes.size
            logger.trace {
                "$count${length?.let {"/$it"} ?: ""} bytes received"
            }
            emit(bytes)
        }
    }
}
a
You can use a coroutines channel to transfer the ByteArray chunks of a form part to the client:
Copy code
val dataChannel = Channel<ByteArray>()
// ...
var counter = 0L
var bytes = 0L
val channel = part.provider()
while (!channel.isClosedForRead) {
    val packet = channel.readRemaining(BUFFER_SIZE)
    while (!packet.exhausted()) {
        val chunk = packet.readByteArray()
        bytes += chunk.size
        println("${++counter}: $bytes bytes received")
        dataChannel.send(chunk)
    }
}
dataChannel.close()
To stream the part's body, you can use the
ChannelProvider
class:
Copy code
val fileChannel = ByteChannel(autoFlush = true)

val writeJob = launch {
    for (chunk in dataChannel) { // receiving chunks
        ensureActive()
        fileChannel.writeFully(chunk)
    }
}

val provider = ChannelProvider(size = null) {
    fileChannel
}
<http://client.post|client.post>("/final-upload") {
    setBody(MultiPartFormDataContent(
        formData {
            append("content", provider, Headers.build {
                append(HttpHeaders.ContentType, "application/octet-stream")
                append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
            })
        }
    ))
}
a
Thanks. Is this also correct way to send it? If yes, which is better
Copy code
suspend fun uploadFlowWithWriter(client: HttpClient, content: Flow<ByteArray>) = coroutineScope {
        val contentChannel = CoroutineScope(coroutineContext).writer(
            CoroutineName("flow-reader") + coroutineContext,
            autoFlush = false
        ) {
            content.collect { chunk ->
                channel.writeFully(chunk)
            }
        }.channel
        client.submitFormWithBinaryData(
            url = "/upload",
            formData = formData {
                append(
                    "content",
                    ChannelProvider { contentChannel },
                    headers = Headers.build {
                        append(HttpHeaders.ContentType, "application/octet-stream")
                        append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
                    }
                )
            },
        ) {
            onUpload { bytes, size ->
                println("$bytes${size?.let { "/$it" } ?: ""} bytes sent")
            }
        }
    }
a
It seems to be the correct way. In my opinion, the better way is one which is more convenient and readable for you.