Stanislav Kral
01/18/2025, 2:47 PMreceiveAsFlow to process Channel items, is it guaranteed that if I receive an item from the Channel my callback to process the item is always called?
Example:
@Composable
fun MyComposable(channel: Channel<MyItem>) {
val flow = channel.receiveAsFlow()
LaunchedEffect(Unit) {
flow.collect { item ->
// process the item
}
}
}
I want to avoid a situation where the item from the Channel is removed and collection is started, but collect is cancelled and the callback to process the item is never invoked, effectively losing the item forever.
The example above is using @Composable annotation, but my question is general.kevin.cianfarini
01/18/2025, 2:50 PM@Composable
fun MyComposable(channel: Channel<MyItem>) {
val flow = channel.receiveAsFlow()
LaunchedEffect(Unit) {
flow.collect { item ->
withContext(NonCancellable) {
// process the item
}
}
}
}Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:48 PMreceiveAsFlow impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?” The impl uses operator fusion so it’s not obvious to me. A naive impl would not need to make any calls between receive and emit that would check for cancellation, but I don’t know if receiveAsFlow makes that guarantee.Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:49 PMforEach on the channel directly.Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:50 PMStanislav Kral
01/18/2025, 4:50 PM“is it possible that there’s a race where theThis was my question exactly 🙏impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?”receiveAsFlow
Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:51 PMStanislav Kral
01/18/2025, 4:59 PMNavActionProcessing(
destinationActions = remember { router.destinationActions },
navigationActionHandler = { action ->
navController.handleNavigationAction(action)
}
)
...
@Composable
private fun NavActionProcessing(
destinationActions: ReceiveChannel<Router.DestinationAction>,
navigationActionHandler: (Router.DestinationAction) -> Unit
) {
LaunchedEffect(Unit) {
try {
for (action in destinationActions) {
logRouterDestinationAction(action)
navigationActionHandler.invoke(action)
}
} catch (e: CancellationException) {
Timber.tag("trace").d("NavActionProcessing cancelled")
throw e
}
}
}Stanislav Kral
01/18/2025, 5:01 PM// observe router's actions
val action by router.destinationActions
.receiveAsFlow()
.asLifecycleAwareState(
lifecycleOwner = LocalLifecycleOwner.current,
initialState = null
)
LaunchedEffect(action) {
logRouterDestinationAction(action)
// and forward them to a navController
navController.handleNavigationAction(action)
}
where asLifecycleAwareState was basically collectAsStateWithLifecycle and I experienced some cases where some recompositions could cause that channel items were received but never processed.
After removing asLifecycleAwareState and using collectAsStateWithLifecycle I was not able to reproduce the issue, but I think it's safer to collect items outside a state.Zach Klippenstein (he/him) [MOD]
01/18/2025, 5:02 PMforEach with repeatOnLifecycle to get the same lifecycle behaviorZach Klippenstein (he/him) [MOD]
01/18/2025, 5:02 PMLaunchedEffectStanislav Kral
01/18/2025, 5:15 PMDmitry Khalanskiy [JB]
01/20/2025, 11:53 AM