The endpoint is “hot”, so you have to use a channe...
# ktor
s
The endpoint is “hot”, so you have to use a channel to push to and then convert the channel to flow.
🤯 1
l
Is this a limitation or something is not yet supported on Ktor side? From what I understand from https://youtrack.jetbrains.com/issue/KTOR-3788, it should work as on Spring or am I missing something?
s
I’m not sure about the referenced ticket, but here is the sample I had in mind, I hope I understood your question correctly:
Copy code
fun Application.configureRouting() {

    val rootGets = Channel<Int>()
    val rootFlow = rootGets.receiveAsFlow()
    var counter = 0

    launch {
        rootFlow.collect {
            println(it)
        }
    }

    routing {
        get("/") {
            rootGets.send(counter++)
            call.respondText("Hello World!")
        }
    }
}
If you have multiple subscribers to your flow, use consumeAsFlow() instead of receiveAsFlow()
l
@Sergey Aldoukhov I’m talking about something more like:
Copy code
fun Application.configureRouting() {
    routing {
        get("/messages") {
            call.response.headers.append(HttpHeaders.ContentType, "text/event-stream")
            call.respond(produceMessages())
        }
    }
}

@Serializable
data class Message(
    val number: Int
)

fun produceMessages() = flow<Message> {
    (1..PRODUCTION_SIZE).forEach {
        emit(Message(it))
    }
}
The same behaviour that you have with Spring Webflux or Quarkus, the simplest SSE possible. cc @Aleksei Tirman [JB] maybe have any hints to share 🙏
s
This is what Websockets are for…
l
So Coroutines Flow. https://youtrack.jetbrains.com/issue/KTOR-3788 but it’s either not working as expected or
call.respond
is not enough.
Copy code
TRACE i.k.s.p.c.ContentNegotiation - No suitable content converter found for response type class kotlinx.coroutines.flow.Flow and body kotlinx.coroutines.flow.SafeFlow
s
This one seems to be the Ok approximation of what you’re trying to achieve, does this work for you? https://github.com/ktorio/ktor-samples/blob/main/sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt I’d still use websockets, better supported in ktor, which is important.
Copy code
routing {
        var socketSession: DefaultWebSocketServerSession? = null

        get("/") {
            rootGets.send(counter++)
            call.respondText("Catch it on socket")
            listOf(1,2,3).asFlow().collect {
                socketSession?.outgoing?.send(Frame.Text(it.toString()))
            }
        }

        webSocket("/stream") {
            socketSession = this
        }
    }