is there any special case for ktor buffering reque...
# ktor
m
is there any special case for ktor buffering requests in tests? during normal execution, this seems to work as expected:
Copy code
val request = httpClient.preparePost {
            //... stuff
}

request.execute { response ->
   val eventStream = response.bodyAsChannel()
   // this works, the stream is not buffered 
}
when I run similar code from a test trying to consume a stream of my own ktor server the response gets buffered instead, so identical code "works" but doesn't allow me to stream values during a integration test.
Copy code
val request = httpClient.preparePost {
            //... stuff
}
request.execute { response ->
   // test code never reaches here until response is fully complete
}
it's not a problem in the server, the events are actually streamed, running the application normally works fine it seems to be related to the
createClient
in ktor.server.testing library but I can't tell for sure
a
The
testApplication
is implemented to read the entire response body by the test client. Recently, the KTOR-7910 has been fixed to allow streaming of the
WriteChannelContent
responses. Can you describe how the server responds to the client's request?
m
it's a SSE stream, custom made, I wrote my implementation before it was added to ktor, but functionally is the same
a
Can you please share a code snippet? What version of Ktor do you use?
m
I am using ktor 3.1.2, and here is a stripped snippet
Copy code
suspend fun ApplicationCall.respondSSEStream(
    writer: suspend ServerSentEventWriter.() -> Unit,
) {
    respondTextWriter {
        val sseWriter = ServerSentEventWriter(this)
        sseWriter.writer()
    }
}

class ServerSentEventWriter(private val delegate: Writer) : Closeable by delegate,
    Flushable by delegate {
    fun write(event: ServerSentEvent) {
        delegate.write(event.toString() + "\n\n")
        flush()
    }
}
used like
Copy code
call.respondSSEStream {
                write(ServerSentEvent("a"))
                write(ServerSentEvent("b"))
                write(ServerSentEvent("c"))
                // in the real app this is something a long the lines of flow.collect { writer.write(it) }
            }
basically I am using
respondTextWriter
and writing the SSE directly as text
a
The fix is delivered within the 3.2.2 release.
blob ty sign 1
m
hey @Aleksei Tirman [JB], doing the migration to ktor SSE and ktor 3.2.2, some things don't work as I expected the
sse { ... }
block in routes doesn't inherint the coroutine context of a regular call, it's using it's own context which means logging traces are lost and plugins that modify the context do not apply to these routes ex, this is the coroutine context:
Copy code
[CoroutineId(208), kotlinx.coroutines.UndispatchedMarker@3cae17ec, "coroutine#208":ScopeCoroutine{Active}@406c8039, Dispatchers.IO]
as oppossed to a a regular call
Copy code
[io.ktor.client.engine.KtorCallContextElement@47a45251, CoroutineId(202), CoroutineName(request), kotlinx.coroutines.UndispatchedMarker@17f78b74, io.ktor.callid.KtorCallIdContextElement@351e9b71, kotlinx.coroutines.slf4j.MDCContext@e03279f,
...
there isn't an obvious way to get the original call context back, or at least I can't see it
Copy code
post("...") {
   call // <- ok call, has context
   see {  
      // this doesn't get called
   }
}
Copy code
route("...", HttpMethod.Post) {
   // there is no call in this context
   sse {
         // this works, but no original call context
    }
}
in the same note, the old
call.respondTextWriter
also does the same now, doesn't inherit the call context
I went back to my original implementation for now
Copy code
suspend fun ApplicationCall.respondSSEStream(
    contentType: String = "text/event-stream",
    keepAliveFrequency: Duration = 15.seconds,
    writer: suspend ServerSentEventWriter.() -> Unit,
) {
    response.header("Content-Type", "${contentType};charset=UTF-8")

    val call = this
    val originalCallContext = currentCoroutineContext()

    respondTextWriter {
        val sseWriter = ServerSentEventWriter(this)
        val keepAlive = call.launch {
            while (originalCallContext.isActive) {
                delay(keepAliveFrequency)
                runCatchingCancellable {
                    sseWriter.write(ServerSentEvent(comments = "keep-alive"))
                }
            }
        }
        try {
            // Carry the original call context
            // and overwrite with the dispatcher used by the writer.
            // This breaking change was introduced in Ktor 3.2.2
            withContext(originalCallContext + currentCoroutineContext()) {
                sseWriter.writer()
            }
        } finally {
            keepAlive.cancel()
        }
    }
}
a
Does the streaming work while testing the SSE endpoint?
m
yes that works now
but I can't use the
sse
apis due to the loss of context
a
Can you please file an issue about the coroutine context problem?
m
sure
a
Unfortunately, I cannot reproduce the problem with the following code:
Copy code
embeddedServer(Netty, port = 3333) {
    install(SSE)
    routing {
        route("/route") {
            get {
                println(coroutineContext)
            }
        }

        sse("/sse") {
            println(coroutineContext)
            send(ServerSentEvent("data"))
        }
    }
}.start(wait = true)
Both handlers print the same context. The full context can be obtained from the
call
property:
Copy code
route("/route") {
    get {
        println(call.coroutineContext)
    }
}
m
I am not doing anything differently AFAIK, just did a print of the context
but I am using a post endpoint, not sure if that would matter
Copy code
route("/v1/app/project/{projectId}/interaction", HttpMethod.Post) {
            sse {
                println(currentCoroutineContext())
            }
        }
Copy code
15:57:12 TRACE i.ktor.server.plugins.sse.SSE -  Starting sse session for /v1/app/project/01983cb9-46e4-7e4f-8007-be8c3bb30bdb/interaction
[CoroutineId(207), kotlinx.coroutines.UndispatchedMarker@5235a78c, "coroutine#207":ScopeCoroutine{Active}@7c622f0d, Dispatchers.IO]
that is failing for me
a
I have a fine-looking context with the following code:
Copy code
route("test", <http://HttpMethod.Post|HttpMethod.Post>) {
    sse {
        println(coroutineContext)
        send(ServerSentEvent("data"))
    }
}
Context:
Copy code
[io.ktor.server.engine.DefaultUncaughtExceptionHandler@2bd283fe, CoroutineName(call-handler), io.ktor.server.netty.NettyDispatcher$CurrentContext@7c8301ac, ScopeCoroutine{Active}@dafd7d6, <http://Dispatchers.IO]|Dispatchers.IO]>
m
well, that's confusing, I can't see how my implementation could have less stuff, let me try it from the root, maybe it's a weird interaction with other plugins
same at the root
Copy code
routing {
        route("sse-test", <http://HttpMethod.Post|HttpMethod.Post>) {
            sse {
                println(currentCoroutineContext())
                println("SSE done")
            }
        }
   }
Copy code
16:06:27 TRACE i.ktor.server.plugins.sse.SSE -  Starting sse session for /sse-test
[CoroutineId(185), kotlinx.coroutines.UndispatchedMarker@349e9ffa, "coroutine#185":ScopeCoroutine{Active}@28e4205c, <http://Dispatchers.IO]|Dispatchers.IO]>
let see if I disable all plugins except SSE
clean now, still missing context
Copy code
fun Application.module() {
    install(SSE)
    routing {
        route("sse-test", <http://HttpMethod.Post|HttpMethod.Post>) {
            sse {
                println(currentCoroutineContext())
                println("SSE done")
            }
        }
    }
}
the test is just
Copy code
val testClient = createClient {}
        <http://testClient.post|testClient.post>("sse-test") {}
Copy code
16:13:20 TRACE i.ktor.server.plugins.sse.SSE -  Starting sse session for /sse-test
[CoroutineId(119), kotlinx.coroutines.UndispatchedMarker@7dab5b5, "coroutine#119":ScopeCoroutine{Active}@3b58bc3e, <http://Dispatchers.IO]|Dispatchers.IO]>
a
So the problem occurs only when testing the SSE endpoint?
m
I found running the integration tests, let me spin the regular app to check
correct, only in tests, regular app works
Copy code
[io.ktor.server.engine.DefaultUncaughtExceptionHandler@7699a714, CoroutineName(call-handler), io.ktor.server.netty.NettyDispatcher$CurrentContext@421b6144, kotlinx.coroutines.UndispatchedMarker@16ad9d41, PluginsTrace(), ScopeCoroutine{Active}@1689919c, <http://Dispatchers.IO]|Dispatchers.IO]>
I will update the issue
🙏 1