minivac
07/20/2025, 1:15 PMval request = httpClient.preparePost {
//... stuff
}
request.execute { response ->
val eventStream = response.bodyAsChannel()
// this works, the stream is not buffered
}
when I run similar code from a test trying to consume a stream of my own ktor server the response gets buffered instead, so identical code "works" but doesn't allow me to stream values during a integration test.
val request = httpClient.preparePost {
//... stuff
}
request.execute { response ->
// test code never reaches here until response is fully complete
}
it's not a problem in the server, the events are actually streamed, running the application normally works fine
it seems to be related to the createClient
in ktor.server.testing library but I can't tell for sureAleksei Tirman [JB]
07/23/2025, 10:52 AMtestApplication
is implemented to read the entire response body by the test client. Recently, the KTOR-7910 has been fixed to allow streaming of the WriteChannelContent
responses. Can you describe how the server responds to the client's request?minivac
07/23/2025, 12:25 PMAleksei Tirman [JB]
07/23/2025, 12:53 PMminivac
07/23/2025, 1:28 PMsuspend fun ApplicationCall.respondSSEStream(
writer: suspend ServerSentEventWriter.() -> Unit,
) {
respondTextWriter {
val sseWriter = ServerSentEventWriter(this)
sseWriter.writer()
}
}
class ServerSentEventWriter(private val delegate: Writer) : Closeable by delegate,
Flushable by delegate {
fun write(event: ServerSentEvent) {
delegate.write(event.toString() + "\n\n")
flush()
}
}
used like
call.respondSSEStream {
write(ServerSentEvent("a"))
write(ServerSentEvent("b"))
write(ServerSentEvent("c"))
// in the real app this is something a long the lines of flow.collect { writer.write(it) }
}
minivac
07/23/2025, 1:29 PMrespondTextWriter
and writing the SSE directly as textAleksei Tirman [JB]
07/23/2025, 1:43 PMminivac
07/24/2025, 12:43 PMsse { ... }
block in routes doesn't inherint the coroutine context of a regular call, it's using it's own context which means logging traces are lost and plugins that modify the context do not apply to these routes
ex, this is the coroutine context:
[CoroutineId(208), kotlinx.coroutines.UndispatchedMarker@3cae17ec, "coroutine#208":ScopeCoroutine{Active}@406c8039, Dispatchers.IO]
as oppossed to a a regular call
[io.ktor.client.engine.KtorCallContextElement@47a45251, CoroutineId(202), CoroutineName(request), kotlinx.coroutines.UndispatchedMarker@17f78b74, io.ktor.callid.KtorCallIdContextElement@351e9b71, kotlinx.coroutines.slf4j.MDCContext@e03279f,
...
there isn't an obvious way to get the original call context back, or at least I can't see it
post("...") {
call // <- ok call, has context
see {
// this doesn't get called
}
}
route("...", HttpMethod.Post) {
// there is no call in this context
sse {
// this works, but no original call context
}
}
in the same note, the old call.respondTextWriter
also does the same now, doesn't inherit the call contextminivac
07/24/2025, 1:13 PMsuspend fun ApplicationCall.respondSSEStream(
contentType: String = "text/event-stream",
keepAliveFrequency: Duration = 15.seconds,
writer: suspend ServerSentEventWriter.() -> Unit,
) {
response.header("Content-Type", "${contentType};charset=UTF-8")
val call = this
val originalCallContext = currentCoroutineContext()
respondTextWriter {
val sseWriter = ServerSentEventWriter(this)
val keepAlive = call.launch {
while (originalCallContext.isActive) {
delay(keepAliveFrequency)
runCatchingCancellable {
sseWriter.write(ServerSentEvent(comments = "keep-alive"))
}
}
}
try {
// Carry the original call context
// and overwrite with the dispatcher used by the writer.
// This breaking change was introduced in Ktor 3.2.2
withContext(originalCallContext + currentCoroutineContext()) {
sseWriter.writer()
}
} finally {
keepAlive.cancel()
}
}
}
Aleksei Tirman [JB]
07/24/2025, 1:15 PMminivac
07/24/2025, 1:22 PMminivac
07/24/2025, 1:23 PMsse
apis due to the loss of contextAleksei Tirman [JB]
07/24/2025, 1:25 PMminivac
07/24/2025, 1:25 PMminivac
07/24/2025, 1:34 PMAleksei Tirman [JB]
07/24/2025, 1:44 PMembeddedServer(Netty, port = 3333) {
install(SSE)
routing {
route("/route") {
get {
println(coroutineContext)
}
}
sse("/sse") {
println(coroutineContext)
send(ServerSentEvent("data"))
}
}
}.start(wait = true)
Both handlers print the same context. The full context can be obtained from the call
property:
route("/route") {
get {
println(call.coroutineContext)
}
}
minivac
07/24/2025, 1:54 PMminivac
07/24/2025, 1:54 PMminivac
07/24/2025, 1:57 PMroute("/v1/app/project/{projectId}/interaction", HttpMethod.Post) {
sse {
println(currentCoroutineContext())
}
}
15:57:12 TRACE i.ktor.server.plugins.sse.SSE - Starting sse session for /v1/app/project/01983cb9-46e4-7e4f-8007-be8c3bb30bdb/interaction
[CoroutineId(207), kotlinx.coroutines.UndispatchedMarker@5235a78c, "coroutine#207":ScopeCoroutine{Active}@7c622f0d, Dispatchers.IO]
minivac
07/24/2025, 1:57 PMAleksei Tirman [JB]
07/24/2025, 2:02 PMroute("test", <http://HttpMethod.Post|HttpMethod.Post>) {
sse {
println(coroutineContext)
send(ServerSentEvent("data"))
}
}
Context:
[io.ktor.server.engine.DefaultUncaughtExceptionHandler@2bd283fe, CoroutineName(call-handler), io.ktor.server.netty.NettyDispatcher$CurrentContext@7c8301ac, ScopeCoroutine{Active}@dafd7d6, <http://Dispatchers.IO]|Dispatchers.IO]>
minivac
07/24/2025, 2:04 PMminivac
07/24/2025, 2:07 PMrouting {
route("sse-test", <http://HttpMethod.Post|HttpMethod.Post>) {
sse {
println(currentCoroutineContext())
println("SSE done")
}
}
}
16:06:27 TRACE i.ktor.server.plugins.sse.SSE - Starting sse session for /sse-test
[CoroutineId(185), kotlinx.coroutines.UndispatchedMarker@349e9ffa, "coroutine#185":ScopeCoroutine{Active}@28e4205c, <http://Dispatchers.IO]|Dispatchers.IO]>
minivac
07/24/2025, 2:08 PMminivac
07/24/2025, 2:15 PMfun Application.module() {
install(SSE)
routing {
route("sse-test", <http://HttpMethod.Post|HttpMethod.Post>) {
sse {
println(currentCoroutineContext())
println("SSE done")
}
}
}
}
the test is just
val testClient = createClient {}
<http://testClient.post|testClient.post>("sse-test") {}
16:13:20 TRACE i.ktor.server.plugins.sse.SSE - Starting sse session for /sse-test
[CoroutineId(119), kotlinx.coroutines.UndispatchedMarker@7dab5b5, "coroutine#119":ScopeCoroutine{Active}@3b58bc3e, <http://Dispatchers.IO]|Dispatchers.IO]>
Aleksei Tirman [JB]
07/24/2025, 2:16 PMminivac
07/24/2025, 2:17 PMminivac
07/24/2025, 2:20 PM[io.ktor.server.engine.DefaultUncaughtExceptionHandler@7699a714, CoroutineName(call-handler), io.ktor.server.netty.NettyDispatcher$CurrentContext@421b6144, kotlinx.coroutines.UndispatchedMarker@16ad9d41, PluginsTrace(), ScopeCoroutine{Active}@1689919c, <http://Dispatchers.IO]|Dispatchers.IO]>
minivac
07/24/2025, 2:21 PM