```coroutineScope { launch { outgoingM...
# coroutines
u
Copy code
coroutineScope {
    launch {
        outgoingMessages
            .collect {
                webSocket.send(...)
            }
    }

    for (frame in webSocket.incoming) {
        ...
    }
}
I have a websocket where there is a incoming messages loop and I need to send messages as well,
outgoingMessages
is currently a
MutableSharedFlow<OutgoingMessage>
to which I pipe in messages and then they get collected here and sent trouble is, that
MutableSharedFlow
is never completing, so this
coroutineScope
will never return, which is a problem, when server closes the connection what would be the most clean and idiomatic way of solving this? or more generally, in a
coroutineScope
how can one child cancel the other?
j
You could ensure that the coroutine that sends messages to the
MutableSharedFlow
is part of the same scope as the coroutine that runs this piece of code, so that both are canceled together.
Another option would be to materialize an "end" token in the flow's elements, and do
Copy code
outgoingMessages
    .takeWhile { it !is MyEndToken }
    .collect { webSocket.send(...) }
u
yea takeWhile is my plan B; trouble is this api you only send END token, and then await END-ACK, and only then it breaks the incoming loop and quits
what do you mean by the "same scope as the croutine.. "? the outgoing messages are coming from UI, so from "outside", so I'm not sure how would that work? (hence me trying to "pipe in" the outgoing messages from outside, but maybe I don't see something more elegant?
j
Ah I see, sorry. In that case maybe you could just cancel the
launch
after your for loop (assuming the server closing the connection means
webSocket.incoming
ends
u
you mean to hold the
val outgoingJob = launch { outgoingmessages collect }
?
j
Either this (and then
outgoingJob.cancel()
) or just
cancel()
the coroutineScope itself before the closing brace. Not sure which is cleaner tbh.
u
neat .. now that I think about it, I'm not sure why it doesnt cancel it automatically? does the channel iterator in the for loop somehow eat the cancellation exception and then just returns?
j
Which cancellation exception? I don't believe
webSocket.incoming
throws a cancellation exception if the channel ends
u
for loop doesnt, but if I
while (true) { incoming.receive() ..
manually then it throws
j
Yes, but that should be an exception about the channel being closed, not a cancellation exception. Nevertheless, it would indeed end your
coroutineScope
block, but it would also fail, which you probably don't want
u
yea, makes sense to return gracefully
btw about the sending, do you see something more clean than this? (typing)
Copy code
class Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = MutableSharedFlow<OutgoingMessage>()

    init {
        scope.launch {
            coroutineScope {
                launch {
                    outgoingMessages
                        .collect {
                            webSocket.send(...)
                        }
                }

                for (frame in webSocket.incoming) {
                    ...
                }
            }
        }    
    }

    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.emit(message)
        }
    }
}
simplified but the gist is there
j
Personally, for many writers to one consumer, I would use a channel rather than a shared flow
1
u
well, technically I want the outgoing messages queued, so maybe its 1-1?
j
Some other things: 1. you're using a flow without any buffer, but you're buffering implicitly by launching an indefinite amount of coroutines. Maybe you should instead make
sendMessage
suspend and directly call channel.send or flow.emit? 2.
scope.launch
already provides a scope, no need for a nested
coroutineScope
u
I'm aware thanks, thats just quick copy paste job to give you a sample
so you'd rather hold a buffered channel vs a mutablestateflow?
j
yes, especially if it's an encapsulated internal structure
Note that with
sendMessage
launching coroutines, you also lose the order
Even if you don't want it to suspend, you should use things like `tryEmit`/`trySendBlocking` instead of launching coroutines like this, and use the channel/flow's buffer instead to relieve the backpressure
u
Copy code
class Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)

    init {
        scope.launch {
            coroutineScope {
                launch {
                    for (outgoingMessage in outgoingMessages) {
                        webSocket.send(...)
                    }
                }

                for (frame in webSocket.incoming) {
                    ...
                }
            }
        }    
    }

    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.send(message)
        }
    }
}
will this lose the order?
j
Yes. Calling
sendMessage(a)
then
sendMessage(b)
will launch 2 coroutines in the
Default
dispatcher. No guarantees as to their ordering. You could be sending
b
before
a
.
Instead, use:
Copy code
fun sendMessage(message) {
        outgoingMessages.trySendBlocking(message) // never blocks because the channel has unlimited capacity
    }
Or ideally
Copy code
suspend fun sendMessage(message) {
        outgoingMessages.send(message)
    }
u
whats the difference in trySendBlocking vs trySend? both seem to be nonsuspend
j
No difference if the capacity is unlimited. The difference is how they handle a full buffer.
trySendBlocking
would block in that case, while
trySend
would just return a result that represents the failure due to full buffer.
u
one thing im noticing now that I added the buffer, that its obviously unlinited, and when the connection closes and that
coroutineScope
returns and after I restart it back again (I ommited this part for simplicity), the queue is obviously there and replays everything pushed to the it while the websocket wasn't there
j
Which is a good thing or a bad thing? If you don't want this you can also clear the queue before cancelling
u
I probably want both at some point but I don' want to weigh you down with it I want the queueing when the app goes to background (websocket closed) but I don't want it when session ends
👌 1
j
after I restart it back again
That changes some things regarding cancellation, so it's hard to reason about it without this part
u
Copy code
class Chat {
    private val scope = CoroutineScope(SupervisorJob()
    private let outgoingMessages = Channel<OutgoingMessage>(capacity = UNLIMTED)

    init {
        scope.launch {
            appIsInForeground
                .collectLatest { inForeground ->
                    if (inForeground) {
                        connect()
                    }
                }
        }    
    }

    private suspend fun connect() {
        coroutineScope {
            launch {
                for (outgoingMessage in outgoingMessages) {
                    webSocket.send(...)
                }
            }

            for (frame in webSocket.incoming) {
                ...
            }
        }
    }

    fun sendMessage(message) {
        scope.launch {
            outgoingMessages.send(message)
        }
    }
}
so yea technically it's like this
when
appIsInForeground
emits false, it cancels the
connect
so the question maybe is, how would I scope the queue to .. well.. something 😄 it would need to be a local variable inside the
coroutineScope
, right?
j
So here you have a connection that can stop and resume, and concurrently you have clients that call
sendMessage
. It really depends on what you actually want to do with the messages. How do you want to deal with messages when the connection stops and resumes, especially messages that are sent during the pause? You could keep a single channel, clear it when you disconnect (maybe with a
try-finally
around
connect()
), but also prevent
sendMessage
from enqueueing anything while the connection is down (checking
appIsInForeground
there too).
u
how do I clear the channel? do you mean to create a new instance?
j
Ah it's been some time, I thought there was a readily available function for that. Well one way is to make it a
var
and replace it with a new instance, another way is just to consume everything from it without sending (something like
while(channel.tryReceive().getOrNull() != null) {}
- I haven't really thought about it).
u
is it safe to just null-out the channel variable? or do I need to close ot first?