So I've been playing with the new Flow stuff and f...
# coroutines
n
So I've been playing with the new Flow stuff and from what I've seen, when I skimmed through the code, none of the built in functions seem to do any cancellation checks? Example:
Copy code
val job = launch(Dispatchers.Default) {
    val result = myGiantList.asFlow().filter {
        cpuBoundFilter(it)
    }.toList()
}
job.cancel()
If I don't do any cancellation checks in
cpuBoundFilter
, the flow will continue to filter through everything even though it has been cancelled.
v
Yes, flow is not different from the rest of the API here: https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/cancellation-and-timeouts.md#cancellation-is-cooperative Cancellation is checked automatically on every suspension, it should be checked manually in a CPU-intensive code
n
Oh, I forgot that cancellation was checked on every suspension, oops. Yeah, then my message is irrelevant, thanks!
So it won't actually filter through all the items right? Instead it will just wait until the latest call to
cpuBoundFilter
completes and then stop filtering right?
v
It will filter through all the items because
list.asFlow
does not suspend anywhere
n
I feel like that's kinda goes against expected behaviour. Specifically, it says
All the suspending functions in kotlinx.coroutines are cancellable.
in the page you linked me. I would normally expect the flow to check for cancellation on every iteration of filter.
1
e
That could be done, but that would add overhead into every operator that most operations will never need, since they do some async operation anyway.
Flow does not violate a general rule that if you do something CPU-consuming the you have to take care of cancellation yourself.
n
Fair enough. I think what I'll do is just add an extension method that looks something like this:
Copy code
fun <T> Flow<T>.asCancellable() = onEach { if(coroutineContext.isActive) throw CancellationException() }
👍 1
v
by the way, you can use
coroutineContext.ensureActive
🙂
2
👍 4