Lilly
04/26/2024, 4:33 PMretry
function on kotlin Flow look like that passes T
instead of `cause`:
fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (value: T) -> Boolean = { true }
): Flow<T>
James Yox
04/26/2024, 4:48 PMtransform
to throw
when the predicate was met then used a retry after:
object SpecialException : Throwable()
fun <T> Flow<T>.retry(
retries: Long,
predicate: suspend (T) -> Boolean
): Flow<T> {
return transform {
emit(it) // Might want to move this below the throw depending on what you want
if (predicate(it)) throw SpecialException
}.retry(tries) { cause -> cause is SpecialException }
}
I didn't actually use retry logic in mine so hopefully I got that right here.Lilly
04/26/2024, 4:52 PMJames Yox
04/26/2024, 4:53 PMuli
04/26/2024, 6:22 PMLilly
04/27/2024, 11:35 PMfun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (value: T) -> Boolean = { true }
): Flow<T>
uli
04/28/2024, 12:14 PMfun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (value: T) -> Boolean = { true },
): Flow<T> {
var remainingTries = MutableStateFlow(retries + 1)
return remainingTries
// Stop our flow if retries have been exhausted or explicitly set to 0
.takeWhile { count ->
count > 0
}
.flatMapLatest {
this
.onEach { value ->
if (predicate(value).not()) {
remainingTries.value--
}
}
.takeWhile(predicate)
// Stops retrying if the flow completes successfully or exceptionally.
// Not on Cancellation though, as it probably just means a retry has happened.
.onCompletion { error ->
if (error !is CancellationException) {
remainingTries.value = 0
}
}
}
}
uli
04/28/2024, 12:52 PMLilly
04/29/2024, 10:05 PMrepeatOnTimeout
, to address this issue effectively:
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.repeatOnTimeout(
timeout: Duration,
times: Long = 3L,
backoffStrategy: Iterator<Long>,
action: suspend () -> Unit,
): Flow<T> = flow {
var attempt = 0L
coroutineScope {
val values = buffer(Channel.RENDEZVOUS).produceIn(this)
whileSelect {
values.onReceiveCatching { value ->
value.onSuccess {
emit(it)
}.onClosed {
return@onReceiveCatching false
}
return@onReceiveCatching true
}
onTimeout(timeout) {
if (attempt < times) action() else throw Exception("Your message")
attempt++
delay(backoffStrategy.next())
return@onTimeout true
}
}
}
}
Disclaimer: code is inspired by/taken from flow operator timeout()
This works for me. For another use case, I simply throw a custom exception and handle it in the retryWhen
operator, where restarting the flow is acceptable.