martmists
12/10/2023, 12:13 AMclass TFunction(override val value: TFunctionType) : TValue<TFunctionType>() {
override val type = ValueType.FUNCTION
context(FlowCollector<LuaStatus>)
suspend fun invoke(args: List<TValue<*>>) {
try {
flow {
value(args)
}.collect {
// There's no way to loop and collect a single item at a time, so collect instead
// Forward the status to the parent flow
emit(it)
// If the function returns, cancel the remainder of the flow
if (it !is LuaStatus.Yield) {
throw CancellationException()
}
}
} catch (e: CancellationException) {
// Prevent the exception from being propagated
return
}
// The function doesn't return, so return an empty list
emit(LuaStatus.Return(emptyList()))
}
}
However, this causes the following:
Exception in thread "main" java.lang.IllegalStateException: Flow exception transparency is violated:
Previous 'emit' call has thrown exception java.util.concurrent.CancellationException, but then emission attempt of value 'Return(values=[])' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
What should I do? I tried working with .catch but it didn't allow me to do a return@invoke since it's not being called in-placekevin.cianfarini
12/10/2023, 12:46 AMflow {
coroutineScope {
cancel(...)
}
}
Daniel Pitts
12/10/2023, 1:33 AMDaniel Pitts
12/10/2023, 1:35 AMDaniel Pitts
12/10/2023, 1:37 AMSam
12/10/2023, 8:02 AMJob
for you to cancel. If you want to terminate the flow early, you can use an operator like takeWhile
or transformWhile
.Sam
12/10/2023, 8:04 AMflow
immediately followed by collect
.
flow { a() }.collect { doSomethingWith(it) }
Is exactly equivalent to
doSomethingWith(a())
martmists
12/10/2023, 11:24 PMDaniel Pitts
12/10/2023, 11:25 PMkevin.cianfarini
12/10/2023, 11:53 PMsingle()
or first()
, but takeWhile produces no elements. In this case
flow {
coroutineScope {
cancel()
}
}
Should help you.Daniel Pitts
12/11/2023, 1:47 AMkevin.cianfarini
12/11/2023, 2:12 AMNoSuchElementException
.Daniel Pitts
12/11/2023, 2:14 AMkevin.cianfarini
12/11/2023, 2:16 AMkevin.cianfarini
12/11/2023, 2:17 AMDaniel Pitts
12/11/2023, 2:19 AMkevin.cianfarini
12/11/2023, 2:23 AMkevin.cianfarini
12/11/2023, 2:24 AMDaniel Pitts
12/11/2023, 2:28 AMkevin.cianfarini
12/11/2023, 2:30 AMJob
from the scope provided by the channelFlow
builder. So it's cancelling the coroutine scrope job of the channel flow.Daniel Pitts
12/11/2023, 2:44 AMchannelFlow
starts a coroutine, and thats the thing being cancelled here.kevin.cianfarini
12/11/2023, 2:48 AMDaniel Pitts
12/11/2023, 3:05 AMSam
12/11/2023, 8:45 AMCancellationException
. The behaviour is equivalent to just writing throw CancellationException()
. It does not mark the job as cancelled, but like any exception it will eventually cancel the job if it goes uncaught.
Here are some examples of what I mean by awaiting a cancelled coroutine. In each case there is an inner coroutine that has been cancelled, and an outer coroutine that attempts to wait for a result from it.
• coroutineScope { cancel() }
• async { cancel() }.await()
• channelFlow { cancel() }.collect()
We can actually prove the difference:
try {
cancel()
awaitCancellation()
} finally {
println(isActive) // prints "false"
}
Whereas:
try {
channelFlow { cancel() }.collect()
awaitCancellation()
} finally {
println(isActive) // prints "true"
}
I'll repeat what I said before: you can't cancel a flow. You can throw a cancellation exception from the flow—the tricks with channel flows or coroutine scopes are just convoluted ways to do that—and it will bubble up into the collector coroutine, but that's not job cancellation, it's just exception propagation.