gts13
03/16/2023, 6:14 PMbezrukov
03/16/2023, 6:33 PMfun WebSocket.messages(): Flow<Message> = channelFlow {
onNewConnection { connection ->
launch {
connection.messages.collect { message ->
trySend(message)
}
}
}
awaitClose { unsubscribe from onNewConnection }
}
Treat it as pseudocode, I don't know what is your WebSocket library, what is the way to obtain new connections etc.
With other options (like using built-in merge
function) you may loose some events (while unsubscribing and subscribing to flow back)gts13
03/16/2023, 6:46 PMmerge
though?bezrukov
03/16/2023, 7:18 PMWebSocket.connections: Flow<Connection>
. If not, it's trivially to make by wrapping ktor's listener (I'm not very familiar with ktor).
the code would be
websocket.connections.flatMapMerge { it.messages }
and that's it