https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

martmists

12/10/2023, 12:13 AM
How do I cancel flows? Here's my attempt:
Copy code
class TFunction(override val value: TFunctionType) : TValue<TFunctionType>() {
    override val type = ValueType.FUNCTION

    context(FlowCollector<LuaStatus>)
    suspend fun invoke(args: List<TValue<*>>) {
        try {
            flow {
                value(args)
            }.collect {
                // There's no way to loop and collect a single item at a time, so collect instead
                
                // Forward the status to the parent flow
                emit(it)
                
                // If the function returns, cancel the remainder of the flow
                if (it !is LuaStatus.Yield) {
                    throw CancellationException()
                }
            }
        } catch (e: CancellationException) {
            // Prevent the exception from being propagated
            return
        }
        // The function doesn't return, so return an empty list
        emit(LuaStatus.Return(emptyList()))
    }
}
However, this causes the following:
Copy code
Exception in thread "main" java.lang.IllegalStateException: Flow exception transparency is violated:
    Previous 'emit' call has thrown exception java.util.concurrent.CancellationException, but then emission attempt of value 'Return(values=[])' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.
What should I do? I tried working with .catch but it didn't allow me to do a return@invoke since it's not being called in-place
k

kevin.cianfarini

12/10/2023, 12:46 AM
Maybe something like
Copy code
flow {
  coroutineScope {
    cancel(...)
  }
}
d

Daniel Pitts

12/10/2023, 1:33 AM
Why not just return from the lambda?
Never mind, I see that won't work here...
This looks a little more complex than it might need to be. What is it exactly you're trying to do? Have a flow that includes a result from a call? Is it always a single result?
s

Sam

12/10/2023, 8:02 AM
Flows don't have a concept of cancellation. A flow isn't a coroutine and doesn't have its own
Job
for you to cancel. If you want to terminate the flow early, you can use an operator like
takeWhile
or
transformWhile
.
Do you really need the inner flow here? You're calling
flow
immediately followed by
collect
.
Copy code
flow { a() }.collect { doSomethingWith(it) }
Is exactly equivalent to
Copy code
doSomethingWith(a())
m

martmists

12/10/2023, 11:24 PM
Unfortunately the block can emit several values any number of times, and I need to make sure it doesn't emit/process anymore after it emits certain values.
d

Daniel Pitts

12/10/2023, 11:25 PM
takeWhile should handle that for you.
k

kevin.cianfarini

12/10/2023, 11:53 PM
There's a valid use case for terminating flows unsuccessfully, like when the consumer of a flow calls something like
single()
or
first()
, but takeWhile produces no elements. In this case
Copy code
flow {
  coroutineScope {
    cancel()
  }
}
Should help you.
d

Daniel Pitts

12/11/2023, 1:47 AM
takeWhile().first() would fail in that car. Not sure why you’re trying to break flows.
k

kevin.cianfarini

12/11/2023, 2:12 AM
Cancellation is different than failing with
NoSuchElementException
.
d

Daniel Pitts

12/11/2023, 2:14 AM
Flow doesn't know anything about the consumer, so I'm not sure how it would know to cancel. NoSuchElementException is appropriate for that case.
k

kevin.cianfarini

12/11/2023, 2:16 AM
It would cancel because it throws CancellationException and bubbles it up to the consumer?
Also, there are use cases where this is appropriate. For example, during a retry for which refreshing an authentication token fails.
d

Daniel Pitts

12/11/2023, 2:19 AM
In that case, why not throw an AuthenticationException?
k

kevin.cianfarini

12/11/2023, 2:23 AM
Because in the case we use it, authentication is handled by an interceptor. If we throw something like AuthenticationException is means that either each call site of a network request needs to handle AuthenticationException, or we need to have a top level exception handler that catches AuthenticationException and then triggers a side effect to log a customer out. Rather than doing that we decide to just cancel the in flight request and have the side effect triggered from elsewhere. FWIW, Jetbrains does this at least once too in IntelliJ.
Point being that cancelling a flow from within is a rare use case that is indeed valid.
d

Daniel Pitts

12/11/2023, 2:28 AM
It looks like it's canceling the collector, not in the flow.
k

kevin.cianfarini

12/11/2023, 2:30 AM
The current coroutine context in that scenario would be a context with a
Job
from the scope provided by the
channelFlow
builder. So it's cancelling the coroutine scrope job of the channel flow.
d

Daniel Pitts

12/11/2023, 2:44 AM
Cancelling a channel is different than cancelling a flow.
channelFlow
starts a coroutine, and thats the thing being cancelled here.
k

kevin.cianfarini

12/11/2023, 2:48 AM
Yes, and when the coroutine here gets cancelled, that gets bubbled up to parents. Therefore cancelling from within.
d

Daniel Pitts

12/11/2023, 3:05 AM
Ah, that makes sense now.
👍 1
s

Sam

12/11/2023, 8:45 AM
Cancellation does not propagate from a child job to a parent job. In the examples here, the apparent cancellation is being caused by attempting to await on a cancelled coroutine. Joining a cancelled coroutine, for better or worse, throws a
CancellationException
. The behaviour is equivalent to just writing
throw CancellationException()
. It does not mark the job as cancelled, but like any exception it will eventually cancel the job if it goes uncaught. Here are some examples of what I mean by awaiting a cancelled coroutine. In each case there is an inner coroutine that has been cancelled, and an outer coroutine that attempts to wait for a result from it. •
coroutineScope { cancel() }
async { cancel() }.await()
channelFlow { cancel() }.collect()
We can actually prove the difference:
Copy code
try {
  cancel()
  awaitCancellation()
} finally {
  println(isActive) // prints "false"
}
Whereas:
Copy code
try {
  channelFlow { cancel() }.collect()
  awaitCancellation()
} finally {
  println(isActive) // prints "true"
}
I'll repeat what I said before: you can't cancel a flow. You can throw a cancellation exception from the flow—the tricks with channel flows or coroutine scopes are just convoluted ways to do that—and it will bubble up into the collector coroutine, but that's not job cancellation, it's just exception propagation.
👍 2