Wondering, if I have a Flow and I need that if the...
# coroutines
d
Wondering, if I have a Flow and I need that if there's an exception that emission should be skipped, and the next upstream emission should be processed, how do I do that?
d
I've been think a lot about this lately and I came to the conclusion that
Flow
simply cannot do this. How does the caller of
emit
know to skip an element. The exception will have to be caught and skipped by the same emitter that threw it. It can't be done upstream or downstream.
v
Are you looking for something like this?
Copy code
val flow = flow {
    emit(1)
    emit(2)
}.map { value ->
    if (value == 1) throw IllegalStateException() 
    else value
}.ignoreIllegalStateExceptionOperator()
.collect { value ->
    println(value)
} // prints "2"
s
If you create your own Flow, you can try-catch an exception and continue emitting values. But if your code just uses a Flow that may throw an exception and you want to have it continue, I don’t think that is possible. It is possible to restart the Flow from scratch, though…. The same would be true for Rx as well. A Flowable that emits an onError terminates. Downstream observers can mitigate the onError, but the source-Flowable cannot continue.
d
Yes @Vsevolod Tolstopyatov [JB], is that possible?
Otherwise, I already have enough nesting to add another try/catch... and I wonder if that's not what we're avoiding by having the catch operator...
v
It’s possible (in a slightly different way tho), could you please create an issue for your use-case? If your
ignoreIllegalStateExceptionOperator
is the first operator in your chain, then you can just catch exceptions you don’t want to propagate to the upsteam.
But it will break an exception transparency guarantee (and potentially other invariants), that’s why it’s worth to discuss such operator in details
d
If it needs to be the first operator, it might be less useful in our use-case, since we had a UI button click flow, and the operation being run can cause a
SecurityException
if the user is not registered (there's no other way to check in this api... 😞 ), So a registration function must be called and the flow should keep on receiving click events (or be able to retry...) after that... Not sure if that's feasable... should I still open the issue @Vsevolod Tolstopyatov [JB]?
v
Yes, please, at least to see a demand on that. Probably you have to use
retry
operator after an operation (
map { transfromThatMayThrow() }.retry { /* your logic here */ }.collect()
), but it’s hard to tell without a detailed understanding of your use case
d
It seems like someone already proposed this: https://github.com/Kotlin/kotlinx.coroutines/issues/1339. Roman seemed to think its not possible over there...
s
I agree with Roman’s suggestion. When an upstream Flow has produced an exception, it simply has finished. You can write your own Flow that continues on an exception (
val data = flow<Int> {   try { .... } catch (e: Throwable) { ... } }
), but for an existing one, I see no good way of forcing that Flow to continue, much like Rx-Observables or Flowables. You could restart a Flow, though, by re-subscribing/re-collecting it again on an exception.
d
But that's not so nice when implementing ui flows like a flow of clicks, you don't want the flow to detach... and try catch is a bit messy there...
s
How can a Flow that represents user-clicks throw an exception?
d
Flow<Result<T>>
might be an option.
d
Map that to an operation that can throw
s
Then you’d need to make that flat-mapped operation restartable. The upstream click-flow will remain active
d
Thats more nesting since result cant be returned from a function...
Who restarts it?
I connect the flow in the init block of a viewmodel
s
Have to look if there is a retry or some other operator on a Flow that does that for you.
d
There's a
retry
operator.
d
But it doesnt catch an exception...
Actually does
s
Something like this:
Copy code
val clicks : Flow<Click> = ...

    val operation: Flow<OperationResult> = ...

    val clickAndThenOperation = clicks
        .flatMapMerge { 
            operation.retry { throwable ->
                // handle throwable
                true 
            } 
        }
    
    clickAndThenOperation.collect {
        ...
    }
d
Isnt that a bit much for something that might be a bit of a common pattern for ui reactive design... but that might do it.
s
You can create your own extension function out of this pattern and re-use it.
d
I'd have to look it through with my use case... but the operation is currently a suspend fun = Single in Rx, not a flow... so for one value we would need to make everything into flows...
And only care for the first value
s
If
suspend fun operation(): SomeResult
, you can do
::operation.asFlow()
d
Nice, but not when you need parameters for the operation
s
Easy to create your own:
val operationAsFlow = flow<SomeResult> { emit(operation(param1, param2)) }
d
True, but for each one of those operatations.... and more nesting... I think an operator would just be so much cleaner... and maybe more efficient.
d
Yeah, might as well make one.