Simon Binder
07/02/2025, 3:56 AMHttpStatement.execute
methods for this. But while this seems to work fine in an actual server (not written with ktor, I just want to test the client), this demo appears stuck, as if the response never arrives. Does anyone know what I need to do to test long-running stream responses?
package com.powersync
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import io.ktor.client.call.body
import io.ktor.client.plugins.HttpTimeout
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.client.request.preparePost
import io.ktor.http.HttpHeaders
import io.ktor.http.content.OutgoingContent
import io.ktor.server.engine.ConnectorType
import io.ktor.server.engine.EngineConnectorBuilder
import io.ktor.server.response.header
import io.ktor.server.response.respond
import <http://io.ktor.server.routing.post|io.ktor.server.routing.post>
import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.awaitFreeSpace
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.time.Duration.Companion.seconds
public suspend fun main() {
coroutineScope {
val server = testServer()
val client = server {
install(HttpTimeout)
install(ContentNegotiation)
}
launch {
val response = client.preparePost("<https://test.com/sync/stream>")
response.execute {
println("has response") // it never gets to this point
val channel: ByteReadChannel = it.body()
while (!channel.isClosedForRead) {
val line = channel.readUTF8Line()
if (line != null) {
println("has line: $line")
}
}
}
}
}
}
internal fun testServer(): (HttpClientConfig<*>.() -> Unit) -> HttpClient {
val application = ApplicationTestBuilder().apply {
engine {
connectors.add(EngineConnectorBuilder(ConnectorType.HTTPS).apply {
host = "<http://test.com|test.com>"
port = 443
})
}
routing {
post("/sync/stream") {
val content = object : OutgoingContent.WriteChannelContent() {
override suspend fun writeTo(channel: ByteWriteChannel) {
while (true) {
channel.awaitFreeSpace()
channel.writeStringUtf8("test\n")
channel.flush()
delay(1.0.seconds)
}
}
}
call.response.header(HttpHeaders.ContentType, "application/x-ndjson")
call.response.header(HttpHeaders.CacheControl, "no-store")
call.response.header(HttpHeaders.Connection, "keep-alive")
call.response.header("X-Accel-Buffering", "no")
call.respond(content)
}
}
}
return application::createClient
}
I can see in the debugger that the while
loop keeps running so I assume there's an internal buffer somewhere. Is there a way to turn that off?Aleksei Tirman [JB]
07/02/2025, 8:22 AMtestApplication
which prevents the response body from streaming.Simon Binder
07/14/2025, 12:24 PM