dave08
02/24/2025, 2:01 PMAleksei Tirman [JB]
02/24/2025, 2:05 PMFlow
as the type of a response object. How big is the JSON array?dave08
02/24/2025, 2:06 PMAleksei Tirman [JB]
02/24/2025, 2:11 PMOutputStream
to avoid loading the entire JSON into memory:
call.respondOutputStream(contentType = ContentType.Application.Json) {
Json.encodeToStream(123, this)
}
dave08
02/24/2025, 2:14 PMAleksei Tirman [JB]
02/24/2025, 2:15 PMContentConverter
?dave08
02/24/2025, 2:15 PMdave08
02/24/2025, 2:16 PMdave08
02/24/2025, 2:17 PMAleksei Tirman [JB]
02/24/2025, 2:17 PMContentNegotiation
plugin.dave08
02/24/2025, 4:06 PMcall.respondOutputStream(contentType = ContentType.Application.Json) {
repository.getFoo(lastSync).collect { foo -> json.encodeToStream(foo, this) }
}
But I'm getting reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response from Intellij's http scratch file when trying out that endpoint...dave08
02/24/2025, 4:09 PMdave08
02/24/2025, 4:49 PMsuspend inline fun <reified T : Any> RoutingCall.respondFlowToJsonArray(flow: Flow<T>) {
respondOutputStream(contentType = ContentType.Application.Json) {
sink().buffer().use { sink ->
sink.writeUtf8("[")
var isFirst = true
flow.collect { obj ->
if (!isFirst)
sink.writeUtf8(",")
else isFirst = false
Json.encodeToBufferedSink(obj, sink)
}
sink.writeUtf8("]")
}
}
}
It might be nice to add this to Ktor though...dave08
02/25/2025, 3:48 AMAleksei Tirman [JB]
02/25/2025, 8:45 AMrespondOutputStream
's block?dave08
02/25/2025, 10:07 AMAleksei Tirman [JB]
02/25/2025, 10:33 AMByteChannel
and stream the response by returning the channel. Here is an example:
embeddedServer(Netty, port = 8080) {
val f = flow {
emit("Hello world")
delay(1000)
emit("Hello world")
delay(1000)
emit("Hello world")
delay(1000)
}
routing {
get {
val channel = ByteChannel()
launch {
f.collect { foo ->
channel.writeStringUtf8(Json.encodeToString(foo))
}
channel.close()
}
call.respond(channel)
}
}
}.start(wait = true)
dave08
02/25/2025, 10:35 AMAleksei Tirman [JB]
02/25/2025, 10:40 AMPrematureCloseException
and in theory, it should be more performant because reading from the OutputStream
blocks the thread.dave08
02/25/2025, 11:36 AMsuspend inline fun <reified T : Any> RoutingCall.respondFlowToJsonArray2(flow: Flow<T>) {
val channel = ByteChannel()
launch {
val logger = application.log
<http://logger.info|logger.info>("Starting to respond flow to json array")
channel.writeStringUtf8("[")
var isFirst = true
flow.collectIndexed { index, obj ->
if (index % 15 == 0) <http://logger.info|logger.info>("Responding flow item $index")
if (!isFirst)
channel.writeStringUtf8(",")
else isFirst = false
channel.writeStringUtf8(Json.encodeToString(obj))
}
channel.writeStringUtf8("]")
<http://logger.info|logger.info>("Finished responding flow to json array")
channel.close()
}
respond(channel)
}
dave08
02/25/2025, 11:37 AM