Vlad
01/10/2025, 2:09 PMcombine
. If it is produced by anything else the upstream cancellation is not happening.
Different behavior was not expected and I can't find a clue anywhere in combine docs or deeper that I should expect that to happen
And I am really curious why/what exactly combine does internally for that to happen and if it can be considered as a bug.
the playground:
enum class FlowOption {
COMBINE,
FLATMAP
}
private var flowOption = FlowOption.COMBINE
private val numbersFlow = MutableStateFlow(1L)
private val stringsFlow = MutableStateFlow("A")
private val internalScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
private fun testFlows() {
// View Model scope
scope.launch {
val combineFlow: Flow<Long> = combine(
numbersFlow,
stringsFlow
) { numbers, strings ->
numbers
}
val flatMapFlow: Flow<Long> = stringsFlow
.flatMapLatest {
numbersFlow
}
val dataFlow: Flow<Long> = when (flowOption) {
FlowOption.COMBINE -> combineFlow
FlowOption.FLATMAP -> flatMapFlow
}
dataFlow
// If we add this - the upstream cancelling is not happening anymore.
// .flatMapLatest { numbers ->
// flowOf(numbers)
// }
.onEach { numbers ->
// On every data emit we want to want a non-blocking(!) the flow coroutine to do stuff
// related to the data in the flow. Is there are better way to do that?
internalScope.launch(
// Passing this like that might be wrong for you, internalScope's dispatcher will not be used,
// and parent cancellation will not cancel this. Don't repeat in prod
context = currentCoroutineContext()
// this would be proper context to preserve internalScope' dispatcher and support parent cancellation
// but with it the unexpected behavior is not happening
//context = SupervisorJob(parent = currentCoroutineContext().job)
) {
// This will cancel upstream data flow ONLY if closest upstream is combine.
throw CancellationException("Throwing cancel manually")
}
}
.collect()
}
}
Sam
01/10/2025, 2:30 PM// This will cancel upstream data flow […]
throw CancellationException("Throwing cancel manually")
There are two things wrong with that idea.
• First, a Flow
can't be cancelled. It's not a coroutine, and doesn't have a Job
.
• Second, throwing a CancellationException
does not really cancel a coroutine. Its effect is unpredictable, and may or may not cause a task to silently terminate.Vlad
01/10/2025, 2:32 PMinternalJob = internalScope.launch()
produces job.
we cancel the internalJob?.cancel()
.
Effect the same, I just removed the additional code to make the playground as thin as possibleSam
01/10/2025, 2:45 PMdataFlow.collect { numbers ->
withContext(Dispatchers.IO) {
launch { doStuffWith(numbers) }
}
}
Sam
01/10/2025, 2:45 PMlaunch { cancel() }
(or anything similar/equivalent) would have any effect on parent tasks, though 🤷Sam
01/10/2025, 2:48 PMcombine
will launch new coroutines in which to produce each of its upstream flows, which might account for why it's changing the behavior you're seeing.Vlad
01/10/2025, 2:50 PMdoStuffWith
is not blocking the flow emits because it is while(true) under the hood.
Not sure if that schema will do the trickSam
01/10/2025, 2:52 PMcoroutineScope {
dataFlow.collect { numbers ->
launch(<http://Dispatchers.IO|Dispatchers.IO>) { doStuffWith(numbers) }
}
}
Vlad
01/10/2025, 2:54 PMdoStuffWith(numbers)
completes.Vlad
01/10/2025, 2:54 PMSam
01/10/2025, 2:56 PMVlad
01/10/2025, 2:59 PMcombine(dataFlow, otherMutableStateFlowInClass)
And the do stuff in parallel fills the otherMutableStateFlowInClass
. And in the output of the downstream I emit Pair<Data, AdditionalStuff>
Vlad
01/10/2025, 3:03 PMVlad
01/10/2025, 3:06 PMcontext = SupervisorJob(parent = currentCoroutineContext().job)
everything works as I expect.
I too am mystified why launch { cancel() } (or anything similar/equivalent) would have any effect on parent tasks, though
This thing just bothers me