Hey everyone, Is it expected that `Flow#flatMapMer...
# coroutines
s
Hey everyone, Is it expected that
Flow#flatMapMerge
or any of the other concurrent
Flow
operator swallows
CancellationException
. Two examples in the thread.
1
Copy code
(0..10)
    .asFlow()
    .flatMapMerge { i ->
        flow<Int> {
            if(i == 9) throw CancellationException("")
            else awaitCancellation()
        }
    }.collect { println(it) }
This hangs forever sine the
CancellationException
doesn't cancel the parallel
awaitCancellation
. And thus also no values ever reach
collect
.
Copy code
(0..10)
    .asFlow()
    .flatMapMerge { i ->
        flow<Int> {
            if(i == 9) throw CancellationException("")
            else emit(it)
        }
    }.collect { println(it) }
This doesn't hang but skips
99
since instead of emitting it throws
CancellationException
but that exception seems to be swallowed. It's never logged or rethrown, we just never see
99
appear in
collect
.
j
First thing to note is that the default concurrency limit is 16, so only the first 16 coroutines will run to start with, and all of them hang, so the code doesn't go any further. Now, even if one of the first 16 coroutines threw
CancellationException
like you did with #99, I would expect that this would only cancel the nested
flow
, but not the overall merged flow.
CancellationException
is a mechanism for cancellation in coroutines, so it is expected to be used by coroutines machinery at different levels, and here my expectation is that only the coroutine collecting the nested flow is cancelled. What exactly are you trying to do here anyway?
s
I would expect that this would only cancel the nested
flow
, but not the overall merged flow.
Meaning you expect #99 to get cancelled, but the resulting flow would still be
(0..100)
excluding 99 ? My use-case is a bit complex, and not 1-on-1 related to this snippet. I'm streaming messages from Azure ServiceBus through Project Reactor, and I need to cancel the streaming on certain triggers. In this case, it's also happening inside a
flatMapMerge
.
First thing to note is that the default concurrency limit is 16, so only the first 16 coroutines will run to start with, and all of them hang, so the code doesn't go any further.
You're correct. I expected it to limit to actually concurrency, not coroutines. So I expected when suspension happens on
awaitCancellation
the next one would schedule itself.
The same happens for the following snippet, the behavior is unchanged. Also updated the snippets above.
Copy code
(0..10)
  .asFlow()
  .flatMapMerge { i ->
    flow<Int> {
      if(i == 9) throw CancellationException("")
      else awaitCancellation()
    }
  }.collect { println(it) }
j
I expected it to limit to actually concurrency, not coroutines
Concurrency is the number of coroutines that run concurrently (whether suspended or not). One of the 16 first coroutines has to complete before another can start to run concurrently with the others. It's different from parallelism, which would limit how many are actually executing (not suspended) at the same time. That said, probably the concurrency here is limited by spawning only 16 coroutines (for the whole thing, so not one for each flow), and they collect the flows one by one.
s
Right exactly, I had an incorrect understanding of
DEFAULT_CONCURRENCY
.
j
Meaning you expect #99 to get cancelled, but the resulting flow would still be
(0..100)
excluding 99 ?
Yep, actually cancellation affects coroutines, not flows per se. So it would have been more correct for me to say that the coroutine collecting that flow is cancelled, and that the parent coroutine that collects the merged flow is not. I'm assuming
flatMapMerge
spawns
DEFAULT_CONCURRENCY
coroutines to collect all the nested flows (hence why only 16 are collected at a time, and no more flows are collected if they all suspend), and it might be using a supervisor scope for this, so the cancellation of one child doesn't cancel all of them (but that's just a guess)
s
Yes, that makes perfect sense.
flatMapMerge
uses a low-level implementation but semantically that is indeed what it does. Thank you for helping me clear up my doubts @Joffrey 🙌
👍 1
j
My pleasure, I'm learning too by looking into this kind of stuff 🙂