How would an implementation of a `retry` function ...
# coroutines
l
How would an implementation of a
retry
function on kotlin Flow look like that passes
T
instead of `cause`:
Copy code
fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE, 
    predicate: suspend (value: T) -> Boolean = { true }
): Flow<T>
j
There might be a more elegant solution but I literally just used
transform
to
throw
when the predicate was met then used a retry after:
Copy code
object SpecialException : Throwable()

fun <T> Flow<T>.retry(
    retries: Long,
    predicate: suspend (T) -> Boolean
): Flow<T> {
    return transform {
        emit(it) // Might want to move this below the throw depending on what you want
        if (predicate(it)) throw SpecialException
    }.retry(tries) { cause -> cause is SpecialException }
}
I didn't actually use retry logic in mine so hopefully I got that right here.
l
I had the same idea, but I'm avoiding throwing an exception since the operation is time-sensitive
j
Understandable, I dont like using exceptions as control structures. I just couldn't find a better way. Would love to know if there is one though.
u
@James Yox I agree. But @Lilly do you think an exception weighs in against a suspend function? And why would you want a flow in the first place? Wouldn’t a suspend fun do? Like this one by @elizarov himself?
l
I guess it doesn't weight that much but it is the wrong intention. What I'm actually looking for is a flow operator that does some action (not restarting the flow) when a specific predicate is satisfied and cancels the flow after x retries with a specific exception. Out of curiosity an operator that restarts the flow on a specific predicate that is not limited to an exception would be also neat like:
Copy code
fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE, 
    predicate: suspend (value: T) -> Boolean = { true }
): Flow<T>
u
Here is a ugly, stateful sketch for your first request. I have not tested it and it might not work. But it might help thinking about the problem. Mainly if the statefulness and recursivity of the problem can be solved differently then with a mutable state flow. (Updated)
Copy code
fun <T> Flow<T>.retry(
        retries: Long = Long.MAX_VALUE,
        predicate: suspend (value: T) -> Boolean = { true },
    ): Flow<T> {
        var remainingTries = MutableStateFlow(retries + 1)
        return remainingTries
            // Stop our flow if retries have been exhausted or explicitly set to 0
            .takeWhile { count ->
                count > 0
            }
            .flatMapLatest {
                this
                    .onEach { value ->
                        if (predicate(value).not()) {
                            remainingTries.value--
                        }
                    }
                    .takeWhile(predicate)
                    // Stops retrying if the flow completes successfully or exceptionally. 
                    // Not on Cancellation though, as it probably just means a retry has happened. 
                    .onCompletion { error ->
                        if (error !is CancellationException) {
                            remainingTries.value = 0
                        }
                    }
            }
    }
btw. what would you like retry() to emit, if all tries are exhausted? The last value with predicate(value)=false?
l
@uli Thank you for investigating this. In my scenario, it was crucial to repeat an action upon a timeout without restarting the entire flow. Restarting would trigger the onStart logic again, which could disrupt the operation. While introducing conditional onStart logic through additional state management is an alternative, it would increase verbosity and reduce readability. Therefore, I introduced the custom operator,
repeatOnTimeout
, to address this issue effectively:
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
    fun <T> Flow<T>.repeatOnTimeout(
        timeout: Duration,
        times: Long = 3L,
        backoffStrategy: Iterator<Long>,
        action: suspend () -> Unit,
    ): Flow<T> = flow {
        var attempt = 0L
        coroutineScope {
            val values = buffer(Channel.RENDEZVOUS).produceIn(this)
            whileSelect {
                values.onReceiveCatching { value ->
                    value.onSuccess {
                        emit(it)
                    }.onClosed {
                        return@onReceiveCatching false
                    }
                    return@onReceiveCatching true
                }
                onTimeout(timeout) {
                    if (attempt < times) action() else throw Exception("Your message")
                    attempt++
                    delay(backoffStrategy.next())
                    return@onTimeout true
                }
            }
        }
    }
Disclaimer: code is inspired by/taken from flow operator
timeout()
This works for me. For another use case, I simply throw a custom exception and handle it in the
retryWhen
operator, where restarting the flow is acceptable.