https://kotlinlang.org logo
Title
d

dave08

09/19/2019, 1:44 PM
Wondering, if I have a Flow and I need that if there's an exception that emission should be skipped, and the next upstream emission should be processed, how do I do that?
d

Dominaezzz

09/19/2019, 2:01 PM
I've been think a lot about this lately and I came to the conclusion that
Flow
simply cannot do this. How does the caller of
emit
know to skip an element. The exception will have to be caught and skipped by the same emitter that threw it. It can't be done upstream or downstream.
v

Vsevolod Tolstopyatov [JB]

09/19/2019, 2:23 PM
Are you looking for something like this?
val flow = flow {
    emit(1)
    emit(2)
}.map { value ->
    if (value == 1) throw IllegalStateException() 
    else value
}.ignoreIllegalStateExceptionOperator()
.collect { value ->
    println(value)
} // prints "2"
s

streetsofboston

09/19/2019, 2:28 PM
If you create your own Flow, you can try-catch an exception and continue emitting values. But if your code just uses a Flow that may throw an exception and you want to have it continue, I don’t think that is possible. It is possible to restart the Flow from scratch, though…. The same would be true for Rx as well. A Flowable that emits an onError terminates. Downstream observers can mitigate the onError, but the source-Flowable cannot continue.
d

dave08

09/19/2019, 3:17 PM
Yes @Vsevolod Tolstopyatov [JB], is that possible?
Otherwise, I already have enough nesting to add another try/catch... and I wonder if that's not what we're avoiding by having the catch operator...
v

Vsevolod Tolstopyatov [JB]

09/22/2019, 12:02 PM
It’s possible (in a slightly different way tho), could you please create an issue for your use-case? If your
ignoreIllegalStateExceptionOperator
is the first operator in your chain, then you can just catch exceptions you don’t want to propagate to the upsteam.
But it will break an exception transparency guarantee (and potentially other invariants), that’s why it’s worth to discuss such operator in details
d

dave08

09/24/2019, 11:01 AM
If it needs to be the first operator, it might be less useful in our use-case, since we had a UI button click flow, and the operation being run can cause a
SecurityException
if the user is not registered (there's no other way to check in this api... 😞 ), So a registration function must be called and the flow should keep on receiving click events (or be able to retry...) after that... Not sure if that's feasable... should I still open the issue @Vsevolod Tolstopyatov [JB]?
v

Vsevolod Tolstopyatov [JB]

09/24/2019, 2:34 PM
Yes, please, at least to see a demand on that. Probably you have to use
retry
operator after an operation (
map { transfromThatMayThrow() }.retry { /* your logic here */ }.collect()
), but it’s hard to tell without a detailed understanding of your use case
d

dave08

09/25/2019, 1:53 PM
It seems like someone already proposed this: https://github.com/Kotlin/kotlinx.coroutines/issues/1339. Roman seemed to think its not possible over there...
s

streetsofboston

09/25/2019, 1:59 PM
I agree with Roman’s suggestion. When an upstream Flow has produced an exception, it simply has finished. You can write your own Flow that continues on an exception (
val data = flow<Int> {   try { .... } catch (e: Throwable) { ... } }
), but for an existing one, I see no good way of forcing that Flow to continue, much like Rx-Observables or Flowables. You could restart a Flow, though, by re-subscribing/re-collecting it again on an exception.
d

dave08

09/25/2019, 3:11 PM
But that's not so nice when implementing ui flows like a flow of clicks, you don't want the flow to detach... and try catch is a bit messy there...
s

streetsofboston

09/25/2019, 3:12 PM
How can a Flow that represents user-clicks throw an exception?
d

Dominaezzz

09/25/2019, 3:12 PM
Flow<Result<T>>
might be an option.
d

dave08

09/25/2019, 3:12 PM
Map that to an operation that can throw
s

streetsofboston

09/25/2019, 3:13 PM
Then you’d need to make that flat-mapped operation restartable. The upstream click-flow will remain active
d

dave08

09/25/2019, 3:13 PM
Thats more nesting since result cant be returned from a function...
Who restarts it?
I connect the flow in the init block of a viewmodel
s

streetsofboston

09/25/2019, 3:14 PM
Have to look if there is a retry or some other operator on a Flow that does that for you.
d

Dominaezzz

09/25/2019, 3:15 PM
There's a
retry
operator.
d

dave08

09/25/2019, 3:17 PM
But it doesnt catch an exception...
Actually does
s

streetsofboston

09/25/2019, 3:18 PM
Something like this:
val clicks : Flow<Click> = ...

    val operation: Flow<OperationResult> = ...

    val clickAndThenOperation = clicks
        .flatMapMerge { 
            operation.retry { throwable ->
                // handle throwable
                true 
            } 
        }
    
    clickAndThenOperation.collect {
        ...
    }
d

dave08

09/25/2019, 3:20 PM
Isnt that a bit much for something that might be a bit of a common pattern for ui reactive design... but that might do it.
s

streetsofboston

09/25/2019, 3:21 PM
You can create your own extension function out of this pattern and re-use it.
d

dave08

09/25/2019, 3:23 PM
I'd have to look it through with my use case... but the operation is currently a suspend fun = Single in Rx, not a flow... so for one value we would need to make everything into flows...
And only care for the first value
s

streetsofboston

09/25/2019, 3:25 PM
If
suspend fun operation(): SomeResult
, you can do
::operation.asFlow()
d

dave08

09/25/2019, 3:26 PM
Nice, but not when you need parameters for the operation
s

streetsofboston

09/25/2019, 3:27 PM
Easy to create your own:
val operationAsFlow = flow<SomeResult> { emit(operation(param1, param2)) }
d

dave08

09/25/2019, 3:29 PM
True, but for each one of those operatations.... and more nesting... I think an operator would just be so much cleaner... and maybe more efficient.
d

Dominaezzz

09/25/2019, 3:29 PM
Yeah, might as well make one.