ursus
04/24/2025, 9:33 AMcoroutineScope {
    launch {
        outgoingMessages
            .collect {
                webSocket.send(...)
            }
    }
    for (frame in webSocket.incoming) {
        ...
    }
}
I have a websocket where there is a incoming messages loop and I need to send messages as well, outgoingMessages is currently a MutableSharedFlow<OutgoingMessage> to which I pipe in messages and then they get collected here and sent
trouble is, that MutableSharedFlow is never completing, so this coroutineScope will never return, which is a problem, when server closes the connection
what would be the most clean and idiomatic way of solving this?
or more generally, in a coroutineScope how can one child cancel the other?Joffrey
04/24/2025, 9:35 AMMutableSharedFlow is part of the same scope as the coroutine that runs this piece of code, so that both are canceled together.Joffrey
04/24/2025, 9:36 AMoutgoingMessages
    .takeWhile { it !is MyEndToken }
    .collect { webSocket.send(...) }ursus
04/24/2025, 9:38 AMursus
04/24/2025, 9:39 AMJoffrey
04/24/2025, 9:40 AMlaunch after your for loop (assuming the server closing the connection means webSocket.incoming endsursus
04/24/2025, 9:40 AMval outgoingJob = launch { outgoingmessages collect }?Joffrey
04/24/2025, 9:41 AMoutgoingJob.cancel()) or just cancel() the coroutineScope itself before the closing brace. Not sure which is cleaner tbh.ursus
04/24/2025, 9:42 AMJoffrey
04/24/2025, 9:43 AMwebSocket.incoming throws a cancellation exception if the channel endsursus
04/24/2025, 9:43 AMwhile (true) { incoming.receive() .. manually then it throwsJoffrey
04/24/2025, 9:44 AMcoroutineScope block, but it would also fail, which you probably don't wantursus
04/24/2025, 9:45 AMursus
04/24/2025, 9:45 AMursus
04/24/2025, 9:47 AMclass Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = MutableSharedFlow<OutgoingMessage>()
    init {
        scope.launch {
            coroutineScope {
                launch {
                    outgoingMessages
                        .collect {
                            webSocket.send(...)
                        }
                }
                for (frame in webSocket.incoming) {
                    ...
                }
            }
        }    
    }
    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.emit(message)
        }
    }
}
simplified but the gist is thereJoffrey
04/24/2025, 9:47 AMursus
04/24/2025, 9:49 AMJoffrey
04/24/2025, 9:50 AMsendMessage suspend and directly call channel.send or flow.emit?
2. scope.launch already provides a scope, no need for a nested coroutineScopeursus
04/24/2025, 9:50 AMursus
04/24/2025, 9:51 AMJoffrey
04/24/2025, 9:51 AMJoffrey
04/24/2025, 9:54 AMsendMessage launching coroutines, you also lose the orderJoffrey
04/24/2025, 9:55 AMursus
04/24/2025, 9:56 AMclass Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)
    init {
        scope.launch {
            coroutineScope {
                launch {
                    for (outgoingMessage in outgoingMessages) {
                        webSocket.send(...)
                    }
                }
                for (frame in webSocket.incoming) {
                    ...
                }
            }
        }    
    }
    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.send(message)
        }
    }
}
will this lose the order?Joffrey
04/24/2025, 9:57 AMsendMessage(a) then sendMessage(b) will launch 2 coroutines in the Default dispatcher. No guarantees as to their ordering. You could be sending b before a.Joffrey
04/24/2025, 9:58 AMfun sendMessage(message) {
        outgoingMessages.trySendBlocking(message) // never blocks because the channel has unlimited capacity
    }
Or ideally
suspend fun sendMessage(message) {
        outgoingMessages.send(message)
    }ursus
04/24/2025, 9:59 AMJoffrey
04/24/2025, 10:00 AMtrySendBlocking would block in that case, while trySend would just return a result that represents the failure due to full buffer.ursus
04/24/2025, 10:01 AMcoroutineScope returns and after I restart it back again (I ommited this part for simplicity), the queue is obviously there and replays everything pushed to the it while the websocket wasn't thereJoffrey
04/24/2025, 10:01 AMursus
04/24/2025, 10:02 AMJoffrey
04/24/2025, 10:02 AMafter I restart it back againThat changes some things regarding cancellation, so it's hard to reason about it without this part
ursus
04/24/2025, 10:04 AMclass Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)
    init {
        scope.launch {
            appIsInForeground
                .collectLatest { inForeground ->
                    if (inForeground) {
                        connect()
                    }
                }
        }    
    }
    private suspend fun connect() {
        coroutineScope {
            launch {
                for (outgoingMessage in outgoingMessages) {
                    webSocket.send(...)
                }
            }
            for (frame in webSocket.incoming) {
                ...
            }
        }
    }
    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.send(message)
        }
    }
}
so yea technically it's like thisursus
04/24/2025, 10:04 AMappIsInForeground emits false, it cancels the connectursus
04/24/2025, 10:05 AMcoroutineScope, right?Joffrey
04/24/2025, 10:11 AMsendMessage. It really depends on what you actually want to do with the messages. How do you want to deal with messages when the connection stops and resumes, especially messages that are sent during the pause? You could keep a single channel, clear it when you disconnect (maybe with a try-finally around connect()), but also prevent sendMessage from enqueueing anything while the connection is down (checking appIsInForeground there too).ursus
04/24/2025, 10:13 AMJoffrey
04/24/2025, 10:15 AMvar and replace it with a new instance, another way is just to consume everything from it without sending (something like while(channel.tryReceive().getOrNull() != null) {} - I haven't really thought about it).ursus
04/24/2025, 10:23 AM