Stanislav Kral
01/18/2025, 2:47 PMreceiveAsFlow
to process Channel
items, is it guaranteed that if I receive an item from the Channel
my callback to process the item is always called?
Example:
@Composable
fun MyComposable(channel: Channel<MyItem>) {
val flow = channel.receiveAsFlow()
LaunchedEffect(Unit) {
flow.collect { item ->
// process the item
}
}
}
I want to avoid a situation where the item from the Channel
is removed and collection is started, but collect
is cancelled and the callback to process the item is never invoked, effectively losing the item forever.
The example above is using @Composable
annotation, but my question is general.kevin.cianfarini
01/18/2025, 2:50 PM@Composable
fun MyComposable(channel: Channel<MyItem>) {
val flow = channel.receiveAsFlow()
LaunchedEffect(Unit) {
flow.collect { item ->
withContext(NonCancellable) {
// process the item
}
}
}
}
Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:48 PMreceiveAsFlow
impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?” The impl uses operator fusion so it’s not obvious to me. A naive impl would not need to make any calls between receive
and emit
that would check for cancellation, but I don’t know if receiveAsFlow
makes that guarantee.Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:49 PMforEach
on the channel directly.Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:50 PMStanislav Kral
01/18/2025, 4:50 PM“is it possible that there’s a race where theThis was my question exactly 🙏impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?”receiveAsFlow
Zach Klippenstein (he/him) [MOD]
01/18/2025, 4:51 PMStanislav Kral
01/18/2025, 4:59 PMNavActionProcessing(
destinationActions = remember { router.destinationActions },
navigationActionHandler = { action ->
navController.handleNavigationAction(action)
}
)
...
@Composable
private fun NavActionProcessing(
destinationActions: ReceiveChannel<Router.DestinationAction>,
navigationActionHandler: (Router.DestinationAction) -> Unit
) {
LaunchedEffect(Unit) {
try {
for (action in destinationActions) {
logRouterDestinationAction(action)
navigationActionHandler.invoke(action)
}
} catch (e: CancellationException) {
Timber.tag("trace").d("NavActionProcessing cancelled")
throw e
}
}
}
Stanislav Kral
01/18/2025, 5:01 PM// observe router's actions
val action by router.destinationActions
.receiveAsFlow()
.asLifecycleAwareState(
lifecycleOwner = LocalLifecycleOwner.current,
initialState = null
)
LaunchedEffect(action) {
logRouterDestinationAction(action)
// and forward them to a navController
navController.handleNavigationAction(action)
}
where asLifecycleAwareState
was basically collectAsStateWithLifecycle
and I experienced some cases where some recompositions could cause that channel items were received but never processed.
After removing asLifecycleAwareState
and using collectAsStateWithLifecycle
I was not able to reproduce the issue, but I think it's safer to collect items outside a state.Zach Klippenstein (he/him) [MOD]
01/18/2025, 5:02 PMforEach
with repeatOnLifecycle
to get the same lifecycle behaviorZach Klippenstein (he/him) [MOD]
01/18/2025, 5:02 PMLaunchedEffect
Stanislav Kral
01/18/2025, 5:15 PMDmitry Khalanskiy [JB]
01/20/2025, 11:53 AM