Anastasios Georgousakis
03/31/2025, 7:41 AMFlow<ByteArray>
using MultiPartFormDataContent so that data are sent progressively without allocating a lot of memory?
I tried the following but it works if I use runBlocking inside append's bodyBuilder which does not sent the data progressively. If I use launch it does not send the flow.
import io.ktor.client.*
import io.ktor.client.request.forms.*
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.testing.*
import io.ktor.utils.io.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.io.readByteArray
import kotlin.test.Test
class UploadTest {
val BUFFER_SIZE: Long = 64 * 1024
@Test
fun testUpload() = testApplication {
application {
routing {
contentType(ContentType.MultiPart.FormData) {
post("upload") {
printUpload(
call.receiveMultipart()
)
call.respond(HttpStatusCode.OK)
}
}
}
}
upload(client, dataFlow())
}
fun dataFlow(): Flow<ByteArray> {
var bytes = 0
var counter = 0
val chunk = "01".repeat(BUFFER_SIZE.toInt() / 2).toByteArray()
return flow {
repeat(9) {
bytes += chunk.size
println("${++counter}: $bytes bytes sent")
emit(chunk)
}
}
}
suspend fun upload(client: HttpClient, data: Flow<ByteArray>) = coroutineScope {
client.submitFormWithBinaryData(
url = "/upload", formData = formData {
append("content", Headers.build {
append(HttpHeaders.ContentType, "application/octet-stream")
append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
}) {
launch {
data.collect {
write(it)
}
}
}
})
}
suspend fun printUpload(
multipartData: MultiPartData
) {
multipartData.forEachPart { part ->
when (part) {
is PartData.FileItem -> {
val fileName = part.originalFileName ?: "data.dat"
println("$fileName: ")
var counter = 0
var bytes = 0
part.provider().asByteArrayFlow().collect {
bytes += it.size
println("${++counter}: $bytes bytes uploaded")
}
}
else -> null
}
part.dispose()
}
}
fun ByteReadChannel.asByteArrayFlow(): Flow<ByteArray> = flow {
var counter = 0L
var bytes = 0L
while (!isClosedForRead) {
val packet = readRemaining(BUFFER_SIZE)
while (!packet.exhausted()) {
val chunk = packet.readByteArray()
bytes += chunk.size
println("${++counter}: $bytes bytes received")
emit(chunk)
}
}
}
}
Aleksei Tirman [JB]
04/02/2025, 8:24 AMAnastasios Georgousakis
04/02/2025, 12:47 PMinterface FileService {
suspend fun uploadFile(
type: String,
name: String,
content: Flow<ByteArray>,
fileSize: Long? = null,
mediaType: String? = null,
metadata: Map<String, String>? = null,
): FileInfo
fun downloadFile(
type: String,
name: String
): Flow<ByteArray>
// etc
}
and I have separate libraries with different implementations. One uses azure blob storage sdk, other amazon s3 sdk and I need also one that talks to a microservice to achive the same functionality. In this library I use ktor client and I want to stream upload and download because files can be very large and I want to run the service in less memory than a file can be
Is it possible to achieve streaming with multipart/form-data ?Anastasios Georgousakis
04/02/2025, 12:57 PMfun ByteReadChannel.asByteArrayFlow(length: Long? = null): Flow<ByteArray> = flow {
var count = 0L
while (!isClosedForRead) {
val packet = readRemaining(BUFFER_SIZE.toLong())
while (!packet.exhausted()) {
val bytes = packet.readByteArray()
count += bytes.size
logger.trace {
"$count${length?.let {"/$it"} ?: ""} bytes received"
}
emit(bytes)
}
}
}
Aleksei Tirman [JB]
04/02/2025, 1:11 PMval dataChannel = Channel<ByteArray>()
// ...
var counter = 0L
var bytes = 0L
val channel = part.provider()
while (!channel.isClosedForRead) {
val packet = channel.readRemaining(BUFFER_SIZE)
while (!packet.exhausted()) {
val chunk = packet.readByteArray()
bytes += chunk.size
println("${++counter}: $bytes bytes received")
dataChannel.send(chunk)
}
}
dataChannel.close()
To stream the part's body, you can use the ChannelProvider
class:
val fileChannel = ByteChannel(autoFlush = true)
val writeJob = launch {
for (chunk in dataChannel) { // receiving chunks
ensureActive()
fileChannel.writeFully(chunk)
}
}
val provider = ChannelProvider(size = null) {
fileChannel
}
<http://client.post|client.post>("/final-upload") {
setBody(MultiPartFormDataContent(
formData {
append("content", provider, Headers.build {
append(HttpHeaders.ContentType, "application/octet-stream")
append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
})
}
))
}
Anastasios Georgousakis
04/02/2025, 3:59 PMsuspend fun uploadFlowWithWriter(client: HttpClient, content: Flow<ByteArray>) = coroutineScope {
val contentChannel = CoroutineScope(coroutineContext).writer(
CoroutineName("flow-reader") + coroutineContext,
autoFlush = false
) {
content.collect { chunk ->
channel.writeFully(chunk)
}
}.channel
client.submitFormWithBinaryData(
url = "/upload",
formData = formData {
append(
"content",
ChannelProvider { contentChannel },
headers = Headers.build {
append(HttpHeaders.ContentType, "application/octet-stream")
append(HttpHeaders.ContentDisposition, "filename=\"data.dat\"")
}
)
},
) {
onUpload { bytes, size ->
println("$bytes${size?.let { "/$it" } ?: ""} bytes sent")
}
}
}
Aleksei Tirman [JB]
04/03/2025, 7:14 AM