hellman
04/03/2025, 12:30 PMhellman
04/03/2025, 12:30 PMprivate fun listenForEvents(query: String): Flow<EventData> = callbackFlow {
httpClient.sse(
urlString = EVENT_URL,
request = {
contentType(ContentType.Application.Json)
parameter("query", query)
},
) {
incoming.collect { event ->
event.data?.let {
Json.decodeFromString<EventData>(it)?.let { eventData ->
send(eventData)
}
}
}
}
}
hellman
04/03/2025, 12:31 PMcollect()
inside the callbackFlow {}
, so I'm wondering if there's a way to pass the incoming
flow back to the caller instead?simon.vergauwen
04/03/2025, 1:12 PMserverSentEventsSession
API which returns a ClientSSESession
or ClientSSESessionWithDeserialization
and you can access incoming
on the returned value.hellman
04/03/2025, 1:57 PMhttpClient.sse()
gives me a ClientSSESession
and that's where I do incoming.collect
.
Not sure if this is an issue, but right now I'm doing a collect()
inside a callbackFlow
, and I feel there should be a more straightforward way to return ClientSSESession.incoming
to the caller of listenForEvents()
.simon.vergauwen
04/03/2025, 2:01 PMclient.sse
takes a lambda for ClientSSESession
but client.serverSentEventsSession
returns ClientSSESession
.
Here a complete snippet:
val session = httpClient.serverSentEventsSession(urlString = EVENT_URL) {
contentType(ContentType.Application.Json)
parameter("query", "...")
}
try {
session.incoming.collect { }
} finally {
session.cancel()
}
I'm not sure if the try/finally
and session.cancel()
is actually necessary since a failure in collect
should cancel the flow, and cancelling the outer Job
should also cancel the incoming
but I'd need to test/verify/double-check that 😅hellman
04/03/2025, 2:12 PM