simon.vergauwen
04/19/2022, 10:26 AMFlow#flatMapMerge
or any of the other concurrent Flow
operator swallows CancellationException
.
Two examples in the thread.simon.vergauwen
04/19/2022, 10:27 AM(0..10)
.asFlow()
.flatMapMerge { i ->
flow<Int> {
if(i == 9) throw CancellationException("")
else awaitCancellation()
}
}.collect { println(it) }
This hangs forever sine the CancellationException
doesn't cancel the parallel awaitCancellation
.
And thus also no values ever reach collect
.simon.vergauwen
04/19/2022, 10:28 AM(0..10)
.asFlow()
.flatMapMerge { i ->
flow<Int> {
if(i == 9) throw CancellationException("")
else emit(it)
}
}.collect { println(it) }
This doesn't hang but skips 99
since instead of emitting it throws CancellationException
but that exception seems to be swallowed. It's never logged or rethrown, we just never see 99
appear in collect
.Joffrey
04/19/2022, 12:09 PMCancellationException
like you did with #99, I would expect that this would only cancel the nested flow
, but not the overall merged flow. CancellationException
is a mechanism for cancellation in coroutines, so it is expected to be used by coroutines machinery at different levels, and here my expectation is that only the coroutine collecting the nested flow is cancelled.
What exactly are you trying to do here anyway?simon.vergauwen
04/19/2022, 1:01 PMI would expect that this would only cancel the nestedMeaning you expect #99 to get cancelled, but the resulting flow would still be, but not the overall merged flow.flow
(0..100)
excluding 99 ?
My use-case is a bit complex, and not 1-on-1 related to this snippet.
I'm streaming messages from Azure ServiceBus through Project Reactor, and I need to cancel the streaming on certain triggers. In this case, it's also happening inside a flatMapMerge
.simon.vergauwen
04/19/2022, 1:05 PMFirst thing to note is that the default concurrency limit is 16, so only the first 16 coroutines will run to start with, and all of them hang, so the code doesn't go any further.You're correct. I expected it to limit to actually concurrency, not coroutines. So I expected when suspension happens on
awaitCancellation
the next one would schedule itself.simon.vergauwen
04/19/2022, 1:07 PM(0..10)
.asFlow()
.flatMapMerge { i ->
flow<Int> {
if(i == 9) throw CancellationException("")
else awaitCancellation()
}
}.collect { println(it) }
Joffrey
04/19/2022, 1:08 PMI expected it to limit to actually concurrency, not coroutinesConcurrency is the number of coroutines that run concurrently (whether suspended or not). One of the 16 first coroutines has to complete before another can start to run concurrently with the others. It's different from parallelism, which would limit how many are actually executing (not suspended) at the same time. That said, probably the concurrency here is limited by spawning only 16 coroutines (for the whole thing, so not one for each flow), and they collect the flows one by one.
simon.vergauwen
04/19/2022, 1:09 PMDEFAULT_CONCURRENCY
.Joffrey
04/19/2022, 1:11 PMMeaning you expect #99 to get cancelled, but the resulting flow would still beYep, actually cancellation affects coroutines, not flows per se. So it would have been more correct for me to say that the coroutine collecting that flow is cancelled, and that the parent coroutine that collects the merged flow is not. I'm assumingexcluding 99 ?(0..100)
flatMapMerge
spawns DEFAULT_CONCURRENCY
coroutines to collect all the nested flows (hence why only 16 are collected at a time, and no more flows are collected if they all suspend), and it might be using a supervisor scope for this, so the cancellation of one child doesn't cancel all of them (but that's just a guess)simon.vergauwen
04/19/2022, 1:23 PMflatMapMerge
uses a low-level implementation but semantically that is indeed what it does.
Thank you for helping me clear up my doubts @Joffrey 🙌Joffrey
04/19/2022, 1:24 PM