Leon Linhart
04/01/2024, 8:21 PMfun subscribe(...): Flow<...>
function.
Specifically, I'm struggling with and could use some tips/opinions on how to properly close the underlying IO resource (in this case, the websocket connection). (See 🧵)Leon Linhart
04/01/2024, 8:23 PMReceiveChannel
that Ktor provides me with plus some custom remapping.
val webSocketSession = httpClient.webSocketSession(host = host, path = ...)
// ...
return webSocketSession.incoming.consumeAsFlow()
.catch { e -> when {
e is ClosedReceiveChannelException || e is CancellationException -> {
val closeReason = webSocketSession.closeReason.await()
throw SocketClosedException(closeReason, e)
}
else -> throw e
}}
.map { frame -> webSocketSession.converter!!.deserialize<...>(frame) }
.onCompletion { webSocketSession.close() }
Leon Linhart
04/01/2024, 8:35 PMFlow
and AutoCloseable
to make closing the resource an explicit step.
What do you think? 🙂Sam
04/01/2024, 8:43 PMSam
04/01/2024, 8:44 PMJoffrey
04/01/2024, 8:53 PMLeon Linhart
04/01/2024, 9:29 PMThe automatic resource management is part of the point of flows--if you want a "hot" resource that needs closing manually, you should stick to a channel insteadRight, this is a good point. My idea here was that I could do a short message exchange here to "initialize" the connection before it essentially becomes a stream of messages. In case of an error, I would've then been able to return earlier. However, it feels more idiomatic to have this happen when I start collecting the flow now that I think about it.
Joffrey
04/02/2024, 8:46 AMSharedFlow
with some options.