Hello guys, when using `receiveAsFlow` to process...
# coroutines
s
Hello guys, when using
receiveAsFlow
to process
Channel
items, is it guaranteed that if I receive an item from the
Channel
my callback to process the item is always called? Example:
Copy code
@Composable
fun MyComposable(channel: Channel<MyItem>) {
    val flow = channel.receiveAsFlow()

    LaunchedEffect(Unit) {
        flow.collect { item ->
            // process the item
        }
    }
}
I want to avoid a situation where the item from the
Channel
is removed and collection is started, but
collect
is cancelled and the callback to process the item is never invoked, effectively losing the item forever. The example above is using
@Composable
annotation, but my question is general.
k
Copy code
@Composable
fun MyComposable(channel: Channel<MyItem>) {
    val flow = channel.receiveAsFlow()

    LaunchedEffect(Unit) {
        flow.collect { item ->
            withContext(NonCancellable) {
              // process the item
            }
        }
    }
}
z
I read the question as “is it possible that there’s a race where the
receiveAsFlow
impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?” The impl uses operator fusion so it’s not obvious to me. A naive impl would not need to make any calls between
receive
and
emit
that would check for cancellation, but I don’t know if
receiveAsFlow
makes that guarantee.
👍 1
But if you’re not doing anything else with the flow I would simply avoid using flow at all and just use
forEach
on the channel directly.
☝️ 1
And if you are doing something else with the flow, then you’d need to check each of those operators to see if they could be cancelled as well.
s
“is it possible that there’s a race where the
receiveAsFlow
impl will successfully receive from the channel but then get cancelled before delivering the item to the collector?”
This was my question exactly 🙏
z
Few issues with the code snippet but I assume that’s just a simplified version of your real code
s
Full code looks like this after removing unnecessary `receiveAsFlow`:
Copy code
NavActionProcessing(
        destinationActions = remember { router.destinationActions },
        navigationActionHandler = { action ->
            navController.handleNavigationAction(action)
        }
    )

...
@Composable
private fun NavActionProcessing(
    destinationActions: ReceiveChannel<Router.DestinationAction>,
    navigationActionHandler: (Router.DestinationAction) -> Unit
) {
    LaunchedEffect(Unit) {
        try {
            for (action in destinationActions) {
                logRouterDestinationAction(action)
                navigationActionHandler.invoke(action)
            }
        } catch (e: CancellationException) {
            Timber.tag("trace").d("NavActionProcessing cancelled")
            throw e
        }
    }
}
I've had an issue previously where I used this
Copy code
// observe router's actions
    val action by router.destinationActions
        .receiveAsFlow()
        .asLifecycleAwareState(
            lifecycleOwner = LocalLifecycleOwner.current,
            initialState = null
        )

    LaunchedEffect(action) {
        logRouterDestinationAction(action)
        // and forward them to a navController
        navController.handleNavigationAction(action)
    }
where
asLifecycleAwareState
was basically
collectAsStateWithLifecycle
and I experienced some cases where some recompositions could cause that channel items were received but never processed. After removing
asLifecycleAwareState
and using
collectAsStateWithLifecycle
I was not able to reproduce the issue, but I think it's safer to collect items outside a state.
z
You can wrap your
forEach
with
repeatOnLifecycle
to get the same lifecycle behavior
And you should be passing the channel as a key to the
LaunchedEffect
s
Thanks a lot 🙏 Seems to work reliably now.
d
To answer the original question: no, there is no guarantee that elements won't be lost, and we should document this. Filed an issue: https://github.com/Kotlin/kotlinx.coroutines/issues/4337