Hey :wave: I want your opinion on something I'm tr...
# coroutines
a
Hey 👋 I want your opinion on something I'm trying to do. There's a
SharedFlow
where I emit all the incoming WebSocket-like messages where • some incoming messages are "commands" I'm supposed to execute • some incoming messages are "responses" to the requests my app makes in order to get more details about said "commands" • (unfortunately, the details can't be transmitted along with the "commands" and must be explicitly requested by the app) A single collector of said
SharedFlow
determines what action should be taken once a "command" arrives. Processing a "command" may involve sending a request (for the details) and awaiting another answer using another ad-hoc spawned collector from the same
SharedFlow
. In short, it looks more or less like this (pseudo-code)
Copy code
class SomeClassThatHasALifecycleBoundScope {
    fun onStarted() {
        lifecycleScope.launch {
            commandExecutor.collectCommands()
        }
    }
}

interface WebSocketClient {
    // replay = 0, buffer = 0, onBufferOverflow = SUSPEND, commands emitted using emit()
    val commands: SharedFlow<Command>
    
    suspend fun sendRequestForCommandDetails(commandId: String, clientToken: String)
}

class CommandExecutor {
    suspend fun collectCommands() = withContext(observingDispatcher) {
        webSocketClient.commands.collect {
            processCommand(it)
        }
    }
    
    suspend fun processCommand(command: Command) = coroutineScope {
        val clientToken = UUID.randomUUID().toString()
        
        val detailsJob = async {
            webSocketClient.commands
                .timeout(10.seconds) 
                .first { it.clientToken == clientToken }
        }
        
        webSocketClient.sendRequestForCommandDetails(command.id, clientToken)
        
        val commandDetails = detailsJob.await()
        
        doSomethingWithCommandDetails(commandDetails)
    }
}
Obviously, the problem above is that
detailsJob
can't receive an answer from
webSocketClient.commands
until the collector from
collectCommands()
unblocks the
SharedFlow
(
emit()
is suspended) So I'm thinking about processing the commands in dedicated coroutines to unblock the
SharedFlow
. The question is: is it OK from the structured concurrency point of view to
launch
in
collect
e.g. like this?
Copy code
suspend fun collectCommands() = withContext(observingDispatcher) {
    webSocketClient.commands.collect {
        launch {
            try {
                processCommand(it)
            catch (e: Exception) { ... }
        }
    }
}
Is the new coroutine a child (or "grandchild") of
SomeClassThatHasALifecycleBoundScope.lifecycleScope
? Will it be cancelled properly when
lifecycleScope
gets cancelled?
s
Yes, it's okay. Your call to
withContext(observingDispatcher)
creates a new coroutine scope (just like
coroutineScope
would). That's why you are able to call
launch
inside that function. The calls to
launch
inside the
collect
are actually resolved as extensions on that
withContext
scope.
The job created by
withContext
is a child of the job in which
collectCommands
was called, so yes, it (and all its children) will be cancelled if the
lifecycleScope
is cancelled.
a
Thanks! 🙌
🍻 1