ursus
04/24/2025, 9:33 AMcoroutineScope {
launch {
outgoingMessages
.collect {
webSocket.send(...)
}
}
for (frame in webSocket.incoming) {
...
}
}
I have a websocket where there is a incoming messages loop and I need to send messages as well, outgoingMessages
is currently a MutableSharedFlow<OutgoingMessage>
to which I pipe in messages and then they get collected here and sent
trouble is, that MutableSharedFlow
is never completing, so this coroutineScope
will never return, which is a problem, when server closes the connection
what would be the most clean and idiomatic way of solving this?
or more generally, in a coroutineScope
how can one child cancel the other?Joffrey
04/24/2025, 9:35 AMMutableSharedFlow
is part of the same scope as the coroutine that runs this piece of code, so that both are canceled together.Joffrey
04/24/2025, 9:36 AMoutgoingMessages
.takeWhile { it !is MyEndToken }
.collect { webSocket.send(...) }
ursus
04/24/2025, 9:38 AMursus
04/24/2025, 9:39 AMJoffrey
04/24/2025, 9:40 AMlaunch
after your for loop (assuming the server closing the connection means webSocket.incoming
endsursus
04/24/2025, 9:40 AMval outgoingJob = launch { outgoingmessages collect }
?Joffrey
04/24/2025, 9:41 AMoutgoingJob.cancel()
) or just cancel()
the coroutineScope itself before the closing brace. Not sure which is cleaner tbh.ursus
04/24/2025, 9:42 AMJoffrey
04/24/2025, 9:43 AMwebSocket.incoming
throws a cancellation exception if the channel endsursus
04/24/2025, 9:43 AMwhile (true) { incoming.receive() ..
manually then it throwsJoffrey
04/24/2025, 9:44 AMcoroutineScope
block, but it would also fail, which you probably don't wantursus
04/24/2025, 9:45 AMursus
04/24/2025, 9:45 AMursus
04/24/2025, 9:47 AMclass Chat {
private val scope = CoroutineScope(SupervisorJob()
private let outgoingMessages = MutableSharedFlow<OutgoingMessage>()
init {
scope.launch {
coroutineScope {
launch {
outgoingMessages
.collect {
webSocket.send(...)
}
}
for (frame in webSocket.incoming) {
...
}
}
}
}
fun sendMessage(message) {
scope.launch {
outgoingMessages.emit(message)
}
}
}
simplified but the gist is thereJoffrey
04/24/2025, 9:47 AMursus
04/24/2025, 9:49 AMJoffrey
04/24/2025, 9:50 AMsendMessage
suspend and directly call channel.send or flow.emit?
2. scope.launch
already provides a scope, no need for a nested coroutineScope
ursus
04/24/2025, 9:50 AMursus
04/24/2025, 9:51 AMJoffrey
04/24/2025, 9:51 AMJoffrey
04/24/2025, 9:54 AMsendMessage
launching coroutines, you also lose the orderJoffrey
04/24/2025, 9:55 AMursus
04/24/2025, 9:56 AMclass Chat {
private val scope = CoroutineScope(SupervisorJob()
private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)
init {
scope.launch {
coroutineScope {
launch {
for (outgoingMessage in outgoingMessages) {
webSocket.send(...)
}
}
for (frame in webSocket.incoming) {
...
}
}
}
}
fun sendMessage(message) {
scope.launch {
outgoingMessages.send(message)
}
}
}
will this lose the order?Joffrey
04/24/2025, 9:57 AMsendMessage(a)
then sendMessage(b)
will launch 2 coroutines in the Default
dispatcher. No guarantees as to their ordering. You could be sending b
before a
.Joffrey
04/24/2025, 9:58 AMfun sendMessage(message) {
outgoingMessages.trySendBlocking(message) // never blocks because the channel has unlimited capacity
}
Or ideally
suspend fun sendMessage(message) {
outgoingMessages.send(message)
}
ursus
04/24/2025, 9:59 AMJoffrey
04/24/2025, 10:00 AMtrySendBlocking
would block in that case, while trySend
would just return a result that represents the failure due to full buffer.ursus
04/24/2025, 10:01 AMcoroutineScope
returns and after I restart it back again (I ommited this part for simplicity), the queue is obviously there and replays everything pushed to the it while the websocket wasn't thereJoffrey
04/24/2025, 10:01 AMursus
04/24/2025, 10:02 AMJoffrey
04/24/2025, 10:02 AMafter I restart it back againThat changes some things regarding cancellation, so it's hard to reason about it without this part
ursus
04/24/2025, 10:04 AMclass Chat {
private val scope = CoroutineScope(SupervisorJob()
private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)
init {
scope.launch {
appIsInForeground
.collectLatest { inForeground ->
if (inForeground) {
connect()
}
}
}
}
private suspend fun connect() {
coroutineScope {
launch {
for (outgoingMessage in outgoingMessages) {
webSocket.send(...)
}
}
for (frame in webSocket.incoming) {
...
}
}
}
fun sendMessage(message) {
scope.launch {
outgoingMessages.send(message)
}
}
}
so yea technically it's like thisursus
04/24/2025, 10:04 AMappIsInForeground
emits false, it cancels the connect
ursus
04/24/2025, 10:05 AMcoroutineScope
, right?Joffrey
04/24/2025, 10:11 AMsendMessage
. It really depends on what you actually want to do with the messages. How do you want to deal with messages when the connection stops and resumes, especially messages that are sent during the pause? You could keep a single channel, clear it when you disconnect (maybe with a try-finally
around connect()
), but also prevent sendMessage
from enqueueing anything while the connection is down (checking appIsInForeground
there too).ursus
04/24/2025, 10:13 AMJoffrey
04/24/2025, 10:15 AMvar
and replace it with a new instance, another way is just to consume everything from it without sending (something like while(channel.tryReceive().getOrNull() != null) {}
- I haven't really thought about it).ursus
04/24/2025, 10:23 AM