I'm currently tampering with a wrapper for a webso...
# coroutines
l
I'm currently tampering with a wrapper for a websocket API that's practically unidirectional and I'm wondering if it's a good idea to expose this via a
fun subscribe(...): Flow<...>
function. Specifically, I'm struggling with and could use some tips/opinions on how to properly close the underlying IO resource (in this case, the websocket connection). (See 🧵)
My initial attempt is to just reuse the
ReceiveChannel
that Ktor provides me with plus some custom remapping.
Copy code
val webSocketSession = httpClient.webSocketSession(host = host, path = ...)
// ...

return webSocketSession.incoming.consumeAsFlow()
    .catch { e -> when {
        e is ClosedReceiveChannelException || e is CancellationException -> {
            val closeReason = webSocketSession.closeReason.await()
            throw SocketClosedException(closeReason, e)
        }
        else -> throw e
    }}
    .map { frame -> webSocketSession.converter!!.deserialize<...>(frame) }
    .onCompletion { webSocketSession.close() }
However, even if I continue refining the implementation, the API will be tricky to use as releasing the IO resource becomes a side-effect of the flow's "control flow" (i.e. terminal operations on the flow and exceptions thrown while collecting close the socket). An alternative that I'm currently considering is introducing a type which extends
Flow
and
AutoCloseable
to make closing the resource an explicit step. What do you think? 🙂
s
Ideally, make it so the socket only opens when you start collecting the flow, and closes when you stop collecting.
The automatic resource management is part of the point of flows--if you want a "hot" resource that needs closing manually, you should stick to a channel instead
j
I'm facing a similar problem with my Krossbow STOMP client (which includes a web socket abstraction too). It used to use a channel type like Ktor to expose the web socket incoming frames, but that meant that every transform operation or "wrapper" implementation of the web socket interface had to manage coroutines to read from the initial source channel and write to the newly exposed transformed channel. So I moved to a flow API. But this is kinda moot, because the flow is "hot", and the semantics of being able to collect it multiple times is broken if the potential wrapper implementations don't use a coroutine to transfer and buffer elements as well. Now I'm reconsidering this approach because, as Sam said, ideally a flow should open and close the resource when collecting, which is not really possible for a web socket that allows sending and receiving at the same time. Maybe I should revert back to channels for this use case
l
The automatic resource management is part of the point of flows--if you want a "hot" resource that needs closing manually, you should stick to a channel instead
Right, this is a good point. My idea here was that I could do a short message exchange here to "initialize" the connection before it essentially becomes a stream of messages. In case of an error, I would've then been able to return earlier. However, it feels more idiomatic to have this happen when I start collecting the flow now that I think about it.
j
True, since you don't need other handles to the live connection to send messages (you only listen to a single stream of messages), this approach should work fine. It must be noted that, in that case, if multiple consumers collect the flow, you'll open several connections. And even with a single consumer at a time you might open-and-close many times the connection, which is not ideal. But both of these problems should be solvable by using a
SharedFlow
with some options.
👍 1