I'm trying to return a large json array using kotl...
# ktor
d
I'm trying to return a large json array using kotlinx serialization, but I'm getting Java heap errors... is there a way to just return a Flow (coroutines) and have Ktor handle gathering the result into a streamed json response?
a
Unfortunately, Ktor doesn't support
Flow
as the type of a response object. How big is the JSON array?
d
More than 3 thousand objects...
a
You can try to respond with an
OutputStream
to avoid loading the entire JSON into memory:
Copy code
call.respondOutputStream(contentType = ContentType.Application.Json) {
    Json.encodeToStream(123, this)
}
👍🏼 1
d
Is there a way to access my content resolver's Json object from a route or do I need to create a new one?
a
Do you mean a
ContentConverter
?
d
I meant ContentNegotiation's Kotlinx serializer instance...
I could try to save it elsewhere and pass it with DI, but it would have been convenient to have it available in Ktor
I usually don't need it, but this is a funny case...
a
Unfortunately, you cannot extract one passed to the
ContentNegotiation
plugin.
d
@Aleksei Tirman [JB] For some funny reason, I tried:
Copy code
call.respondOutputStream(contentType = ContentType.Application.Json) {
                    repository.getFoo(lastSync).collect { foo -> json.encodeToStream(foo, this) }
                }
But I'm getting reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response from Intellij's http scratch file when trying out that endpoint...
Maybe I shouldn't be doing the collection inside that block, but it seems like it's a suspend fun, so why not? Unless I need to call something when I'm finished collecting? Or maybe I'm supposed to add [ and ] and , to build the complete json?
Ok, it seems like this did the trick:
Copy code
suspend inline fun <reified T : Any> RoutingCall.respondFlowToJsonArray(flow: Flow<T>) {
    respondOutputStream(contentType = ContentType.Application.Json) {
        sink().buffer().use { sink ->
            sink.writeUtf8("[")
            var isFirst = true
            flow.collect { obj ->
                if (!isFirst)
                    sink.writeUtf8(",")
                else isFirst = false

                Json.encodeToBufferedSink(obj, sink)
            }
            sink.writeUtf8("]")
        }
    }
}
It might be nice to add this to Ktor though...
Nope, it worked in my unit test with a bit of data, but not on production where i got the same error...
a
Does the flow collection work outside of the
respondOutputStream
's block?
d
How would I do that w/o first saving all those classes into a List, which would then be much less efficient for what I'm trying to acheive @Aleksei Tirman [JB]? Unless I didn't understand your question?
a
Actually, you can write JSON objects directly to the
ByteChannel
and stream the response by returning the channel. Here is an example:
Copy code
embeddedServer(Netty, port = 8080) {
    val f = flow {
        emit("Hello world")
        delay(1000)
        emit("Hello world")
        delay(1000)
        emit("Hello world")
        delay(1000)
    }

    routing {
        get {
            val channel = ByteChannel()

            launch {
                f.collect { foo ->
                    channel.writeStringUtf8(Json.encodeToString(foo))
                }

                channel.close()
            }

            call.respond(channel)
        }
    }

}.start(wait = true)
d
Is that much better than the code I posted? Could that maybe solve the problem (which I have no clue what it could be, since I don't get ANY logs on the server side, not a crash nor even a log of the request itself...)?
a
The above code should solve the problem with the
PrematureCloseException
and in theory, it should be more performant because reading from the
OutputStream
blocks the thread.
👍🏼 1
d
Well, I put in some logs, it got to 8310 items processed, but didn't print the last log... it just prints the first log 3 times and it seems to give that error... here's what I did:
Copy code
suspend inline fun <reified T : Any> RoutingCall.respondFlowToJsonArray2(flow: Flow<T>) {
    val channel = ByteChannel()
    launch {
        val logger = application.log

        <http://logger.info|logger.info>("Starting to respond flow to json array")
        channel.writeStringUtf8("[")
        var isFirst = true
        flow.collectIndexed { index, obj ->
            if (index % 15 == 0) <http://logger.info|logger.info>("Responding flow item $index")
            if (!isFirst)
                channel.writeStringUtf8(",")
            else isFirst = false
            channel.writeStringUtf8(Json.encodeToString(obj))
        }
        channel.writeStringUtf8("]")
        <http://logger.info|logger.info>("Finished responding flow to json array")

        channel.close()
    }

    respond(channel)
}
When I look at the metrics in prometheus, I see that it doesn't reach it's limit (2gb) it get to about 1.2gb while processing this...