Hello. We have experienced unexpected behavior wit...
# coroutines
v
Hello. We have experienced unexpected behavior with the Flow cancellation when we launch internal non blocking coroutine in middle of flow and then cancel it. When we cancel the internal coroutine the upstream flow is also cancelled only if the closest upstream flow is produced by
combine
. If it is produced by anything else the upstream cancellation is not happening. Different behavior was not expected and I can't find a clue anywhere in combine docs or deeper that I should expect that to happen And I am really curious why/what exactly combine does internally for that to happen and if it can be considered as a bug. the playground:
Copy code
enum class FlowOption {
        COMBINE,
        FLATMAP
    }
    private var flowOption = FlowOption.COMBINE
    private val numbersFlow = MutableStateFlow(1L)
    private val stringsFlow = MutableStateFlow("A")
    private val internalScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)

    private fun testFlows() {

        // View Model scope
        scope.launch {

            val combineFlow: Flow<Long> = combine(
                numbersFlow,
                stringsFlow
            ) { numbers, strings ->
                numbers
            }

            val flatMapFlow: Flow<Long> = stringsFlow
                .flatMapLatest {
                    numbersFlow
                }

            val dataFlow: Flow<Long> = when (flowOption) {
                FlowOption.COMBINE -> combineFlow
                FlowOption.FLATMAP -> flatMapFlow
            }

            dataFlow
                // If we add this - the upstream cancelling is not happening anymore.
//                .flatMapLatest { numbers ->
//                    flowOf(numbers)
//                }
                .onEach  { numbers ->

                    // On every data emit we want to want a non-blocking(!) the flow coroutine to do stuff
                    // related to the data in the flow. Is there are better way to do that?
                    internalScope.launch(
                        // Passing this like that might be wrong for you, internalScope's dispatcher will not be used,
                        // and parent cancellation will not cancel this. Don't repeat in prod
                        context = currentCoroutineContext()
                        // this would be proper context to preserve internalScope' dispatcher and support parent cancellation
                        // but with it the unexpected behavior is not happening
                        //context = SupervisorJob(parent = currentCoroutineContext().job)
                    ) {
                        // This will cancel upstream data flow ONLY if closest upstream is combine.
                        throw CancellationException("Throwing cancel manually")
                    }
                }
                .collect()
        }
    }
s
Let's focus on two lines which I think reveal some misconceptions.
Copy code
// This will cancel upstream data flow […]
throw CancellationException("Throwing cancel manually")
There are two things wrong with that idea. • First, a
Flow
can't be cancelled. It's not a coroutine, and doesn't have a
Job
. • Second, throwing a
CancellationException
does not really cancel a coroutine. Its effect is unpredictable, and may or may not cause a task to silently terminate.
v
internalJob = internalScope.launch()
produces job. we cancel the
internalJob?.cancel()
. Effect the same, I just removed the additional code to make the playground as thin as possible
s
Why not just do this?
Copy code
dataFlow.collect { numbers ->
  withContext(Dispatchers.IO) {
    launch { doStuffWith(numbers) }
  }
}
I think that'll make your coroutines easier to reason about, since you won't be mixing scopes.
I too am mystified why
launch { cancel() }
(or anything similar/equivalent) would have any effect on parent tasks, though 🤷
Generally it's a bad idea to try and cancel the current coroutine, since you don't usually know exactly what coroutine invoked your code. For instance,
combine
will launch new coroutines in which to produce each of its upstream flows, which might account for why it's changing the behavior you're seeing.
v
I remember I had the internal scope to make sure that
doStuffWith
is not blocking the flow emits because it is while(true) under the hood. Not sure if that schema will do the trick
s
Sorry, I made a mistake in the code I shared. Thanks for spotting it! I should have written this:
Copy code
coroutineScope {
  dataFlow.collect { numbers ->
    launch(<http://Dispatchers.IO|Dispatchers.IO>) { doStuffWith(numbers) }
  }
}
v
That will suspend dataFlow emitions downstream until
doStuffWith(numbers)
completes.
And I need the doStuff to run asynchronously in parallel
s
🤔 so there are more steps downstream of this one? That makes me think maybe you need a shared flow.
v
Yes. In downstream I
combine(dataFlow, otherMutableStateFlowInClass)
And the do stuff in parallel fills the
otherMutableStateFlowInClass
. And in the output of the downstream I emit
Pair<Data, AdditionalStuff>
In real world, in fact, - that is implemented for "Users online state" feature. Basically I can have data flow of any type anywhere which potentially can have users. I can connect it with online state component which "does the stuff" smartly. But it must not delay the initial data emission, the actual online state will be delivered in parallel
As I mention in the comments if I do the coroutine context properly(?) via
context = SupervisorJob(parent = currentCoroutineContext().job)
everything works as I expect.
Copy code
I too am mystified why launch { cancel() } (or anything similar/equivalent) would have any effect on parent tasks, though
This thing just bothers me