azabost
09/25/2023, 9:11 PMSharedFlow
where I emit all the incoming WebSocket-like messages where
• some incoming messages are "commands" I'm supposed to execute
• some incoming messages are "responses" to the requests my app makes in order to get more details about said "commands"
• (unfortunately, the details can't be transmitted along with the "commands" and must be explicitly requested by the app)
A single collector of said SharedFlow
determines what action should be taken once a "command" arrives.
Processing a "command" may involve sending a request (for the details) and awaiting another answer using another ad-hoc spawned collector from the same SharedFlow
.
In short, it looks more or less like this (pseudo-code)
class SomeClassThatHasALifecycleBoundScope {
fun onStarted() {
lifecycleScope.launch {
commandExecutor.collectCommands()
}
}
}
interface WebSocketClient {
// replay = 0, buffer = 0, onBufferOverflow = SUSPEND, commands emitted using emit()
val commands: SharedFlow<Command>
suspend fun sendRequestForCommandDetails(commandId: String, clientToken: String)
}
class CommandExecutor {
suspend fun collectCommands() = withContext(observingDispatcher) {
webSocketClient.commands.collect {
processCommand(it)
}
}
suspend fun processCommand(command: Command) = coroutineScope {
val clientToken = UUID.randomUUID().toString()
val detailsJob = async {
webSocketClient.commands
.timeout(10.seconds)
.first { it.clientToken == clientToken }
}
webSocketClient.sendRequestForCommandDetails(command.id, clientToken)
val commandDetails = detailsJob.await()
doSomethingWithCommandDetails(commandDetails)
}
}
Obviously, the problem above is that detailsJob
can't receive an answer from webSocketClient.commands
until the collector from collectCommands()
unblocks the SharedFlow
(emit()
is suspended)
So I'm thinking about processing the commands in dedicated coroutines to unblock the SharedFlow
.
The question is: is it OK from the structured concurrency point of view to launch
in collect
e.g. like this?
suspend fun collectCommands() = withContext(observingDispatcher) {
webSocketClient.commands.collect {
launch {
try {
processCommand(it)
catch (e: Exception) { ... }
}
}
}
Is the new coroutine a child (or "grandchild") of SomeClassThatHasALifecycleBoundScope.lifecycleScope
? Will it be cancelled properly when lifecycleScope
gets cancelled?Sam
09/26/2023, 8:41 AMwithContext(observingDispatcher)
creates a new coroutine scope (just like coroutineScope
would). That's why you are able to call launch
inside that function. The calls to launch
inside the collect
are actually resolved as extensions on that withContext
scope.Sam
09/26/2023, 8:42 AMwithContext
is a child of the job in which collectCommands
was called, so yes, it (and all its children) will be cancelled if the lifecycleScope
is cancelled.azabost
09/26/2023, 9:15 AM