Hi, I want to test an endpoint that is supposed to...
# ktor
s
Hi, I want to test an endpoint that is supposed to return an infinite stream of data (until cancelled by the client). I'm using the test host and
HttpStatement.execute
methods for this. But while this seems to work fine in an actual server (not written with ktor, I just want to test the client), this demo appears stuck, as if the response never arrives. Does anyone know what I need to do to test long-running stream responses?
Copy code
package com.powersync

import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import io.ktor.client.call.body
import io.ktor.client.plugins.HttpTimeout
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.client.request.preparePost
import io.ktor.http.HttpHeaders
import io.ktor.http.content.OutgoingContent
import io.ktor.server.engine.ConnectorType
import io.ktor.server.engine.EngineConnectorBuilder
import io.ktor.server.response.header
import io.ktor.server.response.respond
import <http://io.ktor.server.routing.post|io.ktor.server.routing.post>
import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.awaitFreeSpace
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.time.Duration.Companion.seconds

public suspend fun main() {
    coroutineScope {
        val server = testServer()
        val client = server {
            install(HttpTimeout)
            install(ContentNegotiation)
        }

        launch {
            val response = client.preparePost("<https://test.com/sync/stream>")
            response.execute {
                println("has response") // it never gets to this point

                val channel: ByteReadChannel = it.body()

                while (!channel.isClosedForRead) {
                    val line = channel.readUTF8Line()
                    if (line != null) {
                        println("has line: $line")
                    }
                }
            }
        }
    }
}

internal fun testServer(): (HttpClientConfig<*>.() -> Unit) -> HttpClient {
    val application = ApplicationTestBuilder().apply {
        engine {
            connectors.add(EngineConnectorBuilder(ConnectorType.HTTPS).apply {
                host = "<http://test.com|test.com>"
                port = 443
            })
        }

        routing {
            post("/sync/stream") {
                val content = object : OutgoingContent.WriteChannelContent() {
                    override suspend fun writeTo(channel: ByteWriteChannel) {
                         while (true) {
                             channel.awaitFreeSpace()
                             channel.writeStringUtf8("test\n")
                             channel.flush()
                             delay(1.0.seconds)
                        }
                    }
                }

                call.response.header(HttpHeaders.ContentType, "application/x-ndjson")
                call.response.header(HttpHeaders.CacheControl, "no-store")
                call.response.header(HttpHeaders.Connection, "keep-alive")
                call.response.header("X-Accel-Buffering", "no")
                call.respond(content)
            }
        }
    }

    return application::createClient
}
I can see in the debugger that the
while
loop keeps running so I assume there's an internal buffer somewhere. Is there a way to turn that off?
a
There is an issue in the
testApplication
which prevents the response body from streaming.
s
Thank you for fixing this! gratitude thank you