https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Allan Wang

08/21/2019, 8:01 AM
Are flows cancellable? Can we make them adhere to a lifecycle, or would that be done with channels then converted to flows?
o

octylFractal

08/21/2019, 8:02 AM
Flows aren't directly cancellable, but if you cancel the coroutine running it and intercept that in
collect
, it will stop processing
basically, the "running" part of a flow doesn't start until you call a terminal operation like
collect
, so you can't cancel it because that's equivalent to just not calling the terminal operation at all -- nothing is running yet
a

Allan Wang

08/21/2019, 8:06 AM
so as an example, I can make a new job, add it to the context, and then cancel the job to cancel the flow? Perhaps in that case using a channel may be better?
o

octylFractal

08/21/2019, 8:08 AM
you can't just cancel the job, you would also need to check for cancellation in the flow (I think) with
isActive
, or
yield()
l

louiscad

08/21/2019, 8:10 AM
Copy code
val myFlowCollectingCoroutine = launch {
    someFlow.collect { // Or any terminal operator
        // some cancellable code here
    }
}
delay(someDurationOrSomeStuff)
myFlowCollectingCoroutine.cancel()
g

gildor

08/21/2019, 8:10 AM
You should think about flow cancellation as about cancellation of suspend function (which is true for most of Flow terminal operators). To cancel suspend function you have to cancel coroutine where you run this function
👍 1
there is
Flow.launchIn(scope)
operator which starts flow collection on new coroutine and returns Job that can be cancelled, but it’s essentially the same what Louis showed above
a

Allan Wang

08/21/2019, 8:12 AM
My use case is to listen to network changes on Android. My old approach would have been to make a channel, and then to unregister and cancel it onDestroy or something. I was hoping I could just wrap it in a flow and add an onComplete function, and have it all cancel once the activity (and the main job) is destroyed
g

gildor

08/21/2019, 8:15 AM
and have it all cancel once the activity (and the main job) is destroyed
You can do this, just use lifecycleScope to consume flow
Copy code
someFlow.onEach { 
  doSomethingWith(it) 
}.launchIn(activity.lifecycleScope)
no need to cancel manually
and yes, in implementation of this someFlow use onComplete to clean resources
o

octylFractal

08/21/2019, 8:23 AM
I'm curious as to how that actually cancels the flow, does
Flow
listen for cancellation somewhere? Otherwise, if
doSomethingWith(it)
doesn't listen for cancellation, the flow would continue to execute, right?
l

louiscad

08/21/2019, 8:25 AM
Flow is all cancellable suspending functions and
collect
and
emit
are one of them
(I might be wrong on that one)
o

octylFractal

08/21/2019, 8:41 AM
That doesn't appear to be the case: https://gist.github.com/kenzierocks/7e045f1a643d4c32234fb90b9cda7121 I would expect maybe only one of those to print if emit/collect was cancelling, but all 4 do
g

gildor

08/21/2019, 8:43 AM
and what should this sample demonstrate?
It’s just too fast to see cancelation effect. Bu the time when cancel is propagated this flow already consumed
o

octylFractal

08/21/2019, 8:44 AM
No, it's not -- I see it if I add
isActive
check + return from
collect
g

gildor

08/21/2019, 8:46 AM
okay, then you just cancel it too early, launch is not even started
o

octylFractal

08/21/2019, 8:47 AM
this exhibits the same behaviour
g

gildor

08/21/2019, 8:49 AM
Worrks as expected for me https://pl.kotl.in/66g3VyJ1t
o

octylFractal

08/21/2019, 8:49 AM
That's different --
delay
is the cancelling part here
That's why I explicitly used non-cancelling
Thread.sleep
g

gildor

08/21/2019, 8:50 AM
Thread.sleep is non-cancellable for sure
o

octylFractal

08/21/2019, 8:51 AM
my point is to demonstrate that
emit
&
collect
do not handle cancellation -- but the flow does exit properly if something else does
g

gildor

08/21/2019, 8:51 AM
they do
o

octylFractal

08/21/2019, 8:53 AM
How do you explain https://pl.kotl.in/FayorKjlh then?
g

gildor

08/21/2019, 8:54 AM
just cancellation is not propagated because you block the thread where try to cancel it
o

octylFractal

08/21/2019, 8:55 AM
but it clearly is propagated? how would
isActive
print
false
if it's not cancelled?
Another variant, checking before emit is called to see if job has been cancelled. also prints
false
, indicating cancellation, but emit proceeds regardless: https://pl.kotl.in/w3xW51_Vb
l

louiscad

08/21/2019, 9:03 AM
Now, I'm no longer sure
emit
is cancellable 🤔
Wondering for
collect
too
g

gildor

08/21/2019, 9:04 AM
emit is cancellable
o

octylFractal

08/21/2019, 9:06 AM
The problem with claiming
collect
is cancellable here, or
emit
in the other snippet, is that they don't behave like other cancellable functions. If `collect`/`emit` was cancellable, it shouldn't print that message. Here's what happens with delay: https://pl.kotl.in/UA0yi3erM
g

gildor

08/21/2019, 9:15 AM
I see, it’s interesting, but I’m still worry about using of blocking functions in this example but don’t see any particular problem with it
o

octylFractal

08/21/2019, 9:29 AM
I made a simpler to understand example, realizing I could cancel inside `coroutineScope`: https://pl.kotl.in/5Ih-_vg9_
p

Pablichjenkov

08/21/2019, 3:17 PM
🤔 interesting. I would expect emit/collect to be cancellable. Whats the final conclusion a bug or expected by design?
l

louiscad

08/21/2019, 3:34 PM
I think it's by design.
emit
by itself is non cancellable.
collect
by itself it not cancellable either, but some operators built on top of it like
collectLatest
might be. Interestingly, the terminal
toList()
operator doesn't seem cancellable as the following snippet executed in a coroutine scope that has just been cancelled prints `a, b`:
println(flow { emit("a"); emit("b") }.toList().joinToString())
This could leak to never cancelling coroutines if the flow is infite and no cancellable suspending functions are called, which would then leak resources. Is this by design @elizarov?
3 Views