hellman
04/03/2025, 12:30 PMhellman
04/03/2025, 12:30 PMprivate fun listenForEvents(query: String): Flow<EventData> = callbackFlow {
httpClient.sse(
urlString = EVENT_URL,
request = {
contentType(ContentType.Application.Json)
parameter("query", query)
},
) {
incoming.collect { event ->
event.data?.let {
Json.decodeFromString<EventData>(it)?.let { eventData ->
send(eventData)
}
}
}
}
}hellman
04/03/2025, 12:31 PMcollect() inside the callbackFlow {}, so I'm wondering if there's a way to pass the incoming flow back to the caller instead?simon.vergauwen
04/03/2025, 1:12 PMserverSentEventsSession API which returns a ClientSSESession or ClientSSESessionWithDeserialization and you can access incoming on the returned value.hellman
04/03/2025, 1:57 PMhttpClient.sse() gives me a ClientSSESession and that's where I do incoming.collect.
Not sure if this is an issue, but right now I'm doing a collect() inside a callbackFlow , and I feel there should be a more straightforward way to return ClientSSESession.incoming to the caller of listenForEvents().simon.vergauwen
04/03/2025, 2:01 PMclient.sse takes a lambda for ClientSSESession but client.serverSentEventsSession returns ClientSSESession.
Here a complete snippet:
val session = httpClient.serverSentEventsSession(urlString = EVENT_URL) {
contentType(ContentType.Application.Json)
parameter("query", "...")
}
try {
session.incoming.collect { }
} finally {
session.cancel()
}
I'm not sure if the try/finally and session.cancel() is actually necessary since a failure in collect should cancel the flow, and cancelling the outer Job should also cancel the incoming but I'd need to test/verify/double-check that 😅hellman
04/03/2025, 2:12 PM