https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

nulldev

04/12/2019, 9:32 AM
So I've been playing with the new Flow stuff and from what I've seen, when I skimmed through the code, none of the built in functions seem to do any cancellation checks? Example:
Copy code
val job = launch(Dispatchers.Default) {
    val result = myGiantList.asFlow().filter {
        cpuBoundFilter(it)
    }.toList()
}
job.cancel()
If I don't do any cancellation checks in
cpuBoundFilter
, the flow will continue to filter through everything even though it has been cancelled.
v

Vsevolod Tolstopyatov [JB]

04/12/2019, 9:36 AM
Yes, flow is not different from the rest of the API here: https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/cancellation-and-timeouts.md#cancellation-is-cooperative Cancellation is checked automatically on every suspension, it should be checked manually in a CPU-intensive code
n

nulldev

04/12/2019, 9:37 AM
Oh, I forgot that cancellation was checked on every suspension, oops. Yeah, then my message is irrelevant, thanks!
So it won't actually filter through all the items right? Instead it will just wait until the latest call to
cpuBoundFilter
completes and then stop filtering right?
v

Vsevolod Tolstopyatov [JB]

04/12/2019, 9:40 AM
It will filter through all the items because
list.asFlow
does not suspend anywhere
n

nulldev

04/12/2019, 9:48 AM
I feel like that's kinda goes against expected behaviour. Specifically, it says
All the suspending functions in kotlinx.coroutines are cancellable.
in the page you linked me. I would normally expect the flow to check for cancellation on every iteration of filter.
1
e

elizarov

04/12/2019, 10:21 AM
That could be done, but that would add overhead into every operator that most operations will never need, since they do some async operation anyway.
Flow does not violate a general rule that if you do something CPU-consuming the you have to take care of cancellation yourself.
n

nulldev

04/12/2019, 12:34 PM
Fair enough. I think what I'll do is just add an extension method that looks something like this:
Copy code
fun <T> Flow<T>.asCancellable() = onEach { if(coroutineContext.isActive) throw CancellationException() }
👍 1
v

Vsevolod Tolstopyatov [JB]

04/12/2019, 12:40 PM
by the way, you can use
coroutineContext.ensureActive
🙂
2
👍 4
4 Views