https://kotlinlang.org logo
#ktor
Title
# ktor
n

Nikky

03/13/2024, 11:21 AM
i am trying to stream application/x-ndjson with ktor client and process it as a flow following this example: https://ktor.io/docs/response.html#streaming i am using a a prepared http statement and
execute
block with this code
Copy code
val channel: ByteReadChannel = response.body()
flow {
    logger.warn { "closed for read: " + channel.isClosedForRead }
    while (!channel.isClosedForRead) {
        val line = channel.readUTF8Line()
        if(line != null) {
            emit(line)
        }
    }
}
but it seems that
channel.isClosedForRead
is always true maybe because there is no content-length header ? i don't really know curl is able to stream the data just fine
a

Aleksei Tirman [JB]

03/13/2024, 11:47 AM
Are you able to stream the response without using the flow?
n

Nikky

03/13/2024, 1:06 PM
i am able to receive the response in a blocking call, not sure if the flow has any impact on the channel returned from ktor via
response.body()
or
response.bodyAsChannel()
i suspect this is because the channel closes if there is no more bytes to read.. and this api does not have a content-length header..
response.contentLength()
returns `null`as there is a unknown amount of data (ndjjson stream) is there a header to indicate that?
i was able to run the sample code that streams the ktor homepage just fine
a

Aleksei Tirman [JB]

03/13/2024, 1:49 PM
Unfortunately, I cannot reproduce the problem with the following code:
Copy code
val client = HttpClient(CIO) {
}

client.prepareGet("<http://localhost:8082/>").execute { response ->
    val channel: ByteReadChannel = response.body()
    while (!channel.isClosedForRead) {
        val line = channel.readUTF8Line()
        if (line != null) {
            println(line)
        }
    }
}
Server:
Copy code
embeddedServer(Netty, port = 8082) {
    routing {
        get {
            call.respondBytesWriter {
                repeat(10) {
                    writeStringUtf8("hello $it\n")
                    flush()
                    delay(500)
                }
            }
        }
    }
}.start(true)
As you can see, the server doesn't respond with the
Content-Length
header.
n

Nikky

03/13/2024, 1:54 PM
we are using a spring-boot-starter but i will see if i can reproduce it.. and maybe figure out the differences to the simple embedded server
so yeah.. i was able to verify and confirm that the moment i wrap a flow around the loop it somehow closes the channel, no idea why yet.. maybe it needs to have the correct coroutine context ?
a

Aleksei Tirman [JB]

03/13/2024, 3:11 PM
To make it work, you can wrap the whole request code inside the flow's block:
Copy code
val client = HttpClient(CIO) {
}

val f = flow {
    client.prepareGet("<http://localhost:8082/>").execute { response ->
        val channel: ByteReadChannel = response.body()
        while (!channel.isClosedForRead) {
            val line = channel.readUTF8Line()
            if (line != null) {
                emit(line)
            }
        }
    }
}

f.collect {
    println(it)
}
n

Nikky

03/13/2024, 3:53 PM
this is the solution i ended up going with, adds only minor refactoring work, so its probably going to be fine
11 Views