Any idea how to merge/combine undefined number of ...
# flow
g
Any idea how to merge/combine undefined number of flows at runtime? To give you some more context, imagine that the app opens several web socket connections while it is running. And each successful connection allows the webSocketSession to receive a flow from its incoming message channel. All web socket connections will receive the same type of messages and what I want to achieve is to combine them into a single flow. Is that possible?
b
I would say you need to write your custom operator, smth like
Copy code
fun WebSocket.messages(): Flow<Message> = channelFlow {
   onNewConnection { connection ->
      launch {
         connection.messages.collect { message ->
            trySend(message)
         }
      }
   }
   awaitClose { unsubscribe from onNewConnection }
}
Treat it as pseudocode, I don't know what is your WebSocket library, what is the way to obtain new connections etc. With other options (like using built-in
merge
function) you may loose some events (while unsubscribing and subscribing to flow back)
g
I use ktor for websocket, if that helps. And here is how I get the incoming messages. Thank you for your help, I will take a look if it can help
btw how can I do this with
merge
though?
b
Actually you might do it with no issues.. Assuming you have
WebSocket.connections: Flow<Connection>
. If not, it's trivially to make by wrapping ktor's listener (I'm not very familiar with ktor). the code would be
Copy code
websocket.connections.flatMapMerge { it.messages }
and that's it