Are flows cancellable? Can we make them adhere to ...
# coroutines
a
Are flows cancellable? Can we make them adhere to a lifecycle, or would that be done with channels then converted to flows?
o
Flows aren't directly cancellable, but if you cancel the coroutine running it and intercept that in
collect
, it will stop processing
basically, the "running" part of a flow doesn't start until you call a terminal operation like
collect
, so you can't cancel it because that's equivalent to just not calling the terminal operation at all -- nothing is running yet
a
so as an example, I can make a new job, add it to the context, and then cancel the job to cancel the flow? Perhaps in that case using a channel may be better?
o
you can't just cancel the job, you would also need to check for cancellation in the flow (I think) with
isActive
, or
yield()
l
Copy code
val myFlowCollectingCoroutine = launch {
    someFlow.collect { // Or any terminal operator
        // some cancellable code here
    }
}
delay(someDurationOrSomeStuff)
myFlowCollectingCoroutine.cancel()
g
You should think about flow cancellation as about cancellation of suspend function (which is true for most of Flow terminal operators). To cancel suspend function you have to cancel coroutine where you run this function
👍 1
there is
Flow.launchIn(scope)
operator which starts flow collection on new coroutine and returns Job that can be cancelled, but it’s essentially the same what Louis showed above
a
My use case is to listen to network changes on Android. My old approach would have been to make a channel, and then to unregister and cancel it onDestroy or something. I was hoping I could just wrap it in a flow and add an onComplete function, and have it all cancel once the activity (and the main job) is destroyed
g
and have it all cancel once the activity (and the main job) is destroyed
You can do this, just use lifecycleScope to consume flow
Copy code
someFlow.onEach { 
  doSomethingWith(it) 
}.launchIn(activity.lifecycleScope)
no need to cancel manually
and yes, in implementation of this someFlow use onComplete to clean resources
o
I'm curious as to how that actually cancels the flow, does
Flow
listen for cancellation somewhere? Otherwise, if
doSomethingWith(it)
doesn't listen for cancellation, the flow would continue to execute, right?
l
Flow is all cancellable suspending functions and
collect
and
emit
are one of them
(I might be wrong on that one)
o
That doesn't appear to be the case: https://gist.github.com/kenzierocks/7e045f1a643d4c32234fb90b9cda7121 I would expect maybe only one of those to print if emit/collect was cancelling, but all 4 do
g
and what should this sample demonstrate?
It’s just too fast to see cancelation effect. Bu the time when cancel is propagated this flow already consumed
o
No, it's not -- I see it if I add
isActive
check + return from
collect
g
okay, then you just cancel it too early, launch is not even started
o
this exhibits the same behaviour
g
Worrks as expected for me https://pl.kotl.in/66g3VyJ1t
o
That's different --
delay
is the cancelling part here
That's why I explicitly used non-cancelling
Thread.sleep
g
Thread.sleep is non-cancellable for sure
o
my point is to demonstrate that
emit
&
collect
do not handle cancellation -- but the flow does exit properly if something else does
g
they do
o
How do you explain https://pl.kotl.in/FayorKjlh then?
g
just cancellation is not propagated because you block the thread where try to cancel it
o
but it clearly is propagated? how would
isActive
print
false
if it's not cancelled?
Another variant, checking before emit is called to see if job has been cancelled. also prints
false
, indicating cancellation, but emit proceeds regardless: https://pl.kotl.in/w3xW51_Vb
l
Now, I'm no longer sure
emit
is cancellable 🤔
Wondering for
collect
too
g
emit is cancellable
o
The problem with claiming
collect
is cancellable here, or
emit
in the other snippet, is that they don't behave like other cancellable functions. If `collect`/`emit` was cancellable, it shouldn't print that message. Here's what happens with delay: https://pl.kotl.in/UA0yi3erM
g
I see, it’s interesting, but I’m still worry about using of blocking functions in this example but don’t see any particular problem with it
o
I made a simpler to understand example, realizing I could cancel inside `coroutineScope`: https://pl.kotl.in/5Ih-_vg9_
p
🤔 interesting. I would expect emit/collect to be cancellable. Whats the final conclusion a bug or expected by design?
l
I think it's by design.
emit
by itself is non cancellable.
collect
by itself it not cancellable either, but some operators built on top of it like
collectLatest
might be. Interestingly, the terminal
toList()
operator doesn't seem cancellable as the following snippet executed in a coroutine scope that has just been cancelled prints `a, b`:
println(flow { emit("a"); emit("b") }.toList().joinToString())
This could leak to never cancelling coroutines if the flow is infite and no cancellable suspending functions are called, which would then leak resources. Is this by design @elizarov?