Is there a way to terminate a flow from within an ...
# coroutines
j
Is there a way to terminate a flow from within an intermediate operator lambda? For example, a flow emits 5 random numbers between 1-10. Inside
map
, if the number is over 5, you terminate the flow.
👆 1
o
cancel()
?
i.e. just cancel whatever job is in the coroutine context
j
I tried cancel, but it looks like that operator isn't available on the receiver within an intermediate flow operator.
Would there be a way to cancel the flow without canceling the context's job?
o
they're the same thing afaik
you can't do
coroutineContext.cancel()
?
a
Or just
throw CancellationException()
👍🏾 1
j
Yup, that worked. Thanks @octylFractal. However, something a bit weird happens. For example:
Copy code
val pf = flowOf(1,2,3).map { if (it == 3) coroutineContext.cancel() else it }
runBlocking {
    val l = pf.toList()
    println(l)
}
prints:
Copy code
[1, 2, kotlin.Unit]
It's that last item
kotlin.Unit
that's surprising.
o
nothing returns
void
in kotlin, it always returns
Unit
, so you're returning
Unit
from
cancel()
there
in this case yes, perhaps throwing the
CancellationException
would be better
j
K. I'll try that. Thanks!
So throwing
CancellationException
works, but
catch
must also be used to be able to get the items emitted before the exception is thrown. Otherwise partial collection doesn't occur.
Thanks @octylFractal @Adam Powell!
z
There's actually a PR open to introduce an operator that is for exactly this use case I think: https://github.com/Kotlin/kotlinx.coroutines/pull/2066
1
j
Nice! Thanks @Zach Klippenstein (he/him) [MOD]! Maybe it'll be available soon.
l
Throwing a
CancellationException
is not the right way as it's ambiguous with other cancellation causes. You can take a look at how the
take
intermediate operator is done, and that can help to do what you want. Or you can copy
transformWhile
that Zach linked, or wait for it to be merged (which might take a while).
👍 1
t
How about
takeWhile
?