minivac
06/03/2025, 2:08 PMconfigure = ...
in this case to detect when the client dropped the channel, it no longer exists in ktor 3. Is there an equivalent? was it moved?
embeddedServer(Netty, port = 8080, host = "0.0.0.0", configure = {
channelPipelineConfig = {
addLast("cancellationDetector", AbortableRequestHandler())
}
}) {
configureRouting()
}.start(wait = true)
minivac
06/03/2025, 2:16 PMminivac
06/03/2025, 3:36 PMminivac
06/03/2025, 3:39 PMcall.respondText {
GlobalScope.launch { sendPingOrCloseScope() }
someFlow.collect { sendStuff() }
}
It's not very elegant to depend on a ChannelClosedException
from the ping when the flow is busy, but there seems no way to actually check for call "liveness"Aleksei Tirman [JB]
06/10/2025, 8:56 AMminivac
06/10/2025, 9:33 AMminivac
06/10/2025, 9:36 AMoverride fun channelInactive(ctx: ChannelHandlerContext?)
, but from that ChannelHandlerContext it's not possible (AFAIK, maybe it is) to get back to the original request / scope handling the request to cancel itminivac
06/10/2025, 9:44 AMcall.respondText {
onClientDisconnected {
cancel the job / whatever is required
}
{ ... do some slow work... }
}
Aleksei Tirman [JB]
06/10/2025, 12:06 PMembeddedServer(Netty, serverConfig {
module {
install(SSE)
routing {
sse {
while (true) {
send(ServerSentEvent("Sending"))
delay(1.seconds)
}
}
}
}
}) {
connector {
port = 8060
}
channelPipelineConfig = {
addLast("cancellationDetector", object : ChannelInboundHandlerAdapter() {
override fun channelInactive(ctx: ChannelHandlerContext?) {
super.channelInactive(ctx)
}
})
}
}.start(wait = true)
minivac
06/10/2025, 12:50 PMimport io.ktor.server.application.ApplicationCall
import io.ktor.util.AttributeKey
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import java.util.concurrent.ConcurrentHashMap
private val ChannelInactiveKey = AttributeKey<() -> Unit>("OnChannelInactiveCallback")
fun ApplicationCall.onChannelInactive(fn: () -> Unit) {
attributes.put(ChannelInactiveKey, fn)
}
class AbortableRequestHandler : ChannelInboundHandlerAdapter() {
private val activeChannels = ConcurrentHashMap<ChannelHandlerContext, ApplicationCall>()
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
super.channelRead(ctx, msg)
if (msg is ApplicationCall) {
activeChannels[ctx] = msg
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
val call = activeChannels.remove(ctx)
if (call != null) {
call.attributes.getOrNull(ChannelInactiveKey)?.invoke()
}
}
}
minivac
06/10/2025, 1:28 PM