Ruanfi
04/14/2023, 11:13 AM/**
* If effect failed, then try fast [fastTryTimes] times again,
* then try Exponential backoff with initial [exponentialInitialDuration] for [exponentialMaxDuration]
* then try spaced with [exponentialMaxDuration] jittered until success
*/
@ExperimentalTime
fun <A> complexPolicy(): Schedule<A, List<A>> =
Schedule.recurs<A>(fastTryTimes)
.andThen(
Schedule.exponential<A>(exponentialInitialDuration)
.doWhile { _, out -> out < exponentialMaxDuration }
.andThen(
Schedule.spaced<A>(exponentialMaxDuration).jittered()))
.zipRight(Schedule.identity<A>().collect())
simon.vergauwen
04/17/2023, 10:02 AMretry
around the operation that is emitting? I think the detail is on how you called this.
while(true) {
complexPolicy<Throwable>().retry {
// code that emits
}
}
This would "restart" the policy on success (in between emits), but I am guessing that you want this around the creation of your connection? 🤔 So you currently might have something like this?
complexPolicy<Throwable>().retry {
val conn = createConnection()
while(true) {
val res = conn.doSomething()
emit(res)
}
}
Ruanfi
04/17/2023, 12:12 PMsimon.vergauwen
04/17/2023, 12:17 PMemit
.Ruanfi
04/17/2023, 1:45 PMRuanfi
04/17/2023, 1:45 PMsuspend fun <A> withExponentialBackOff(f: suspend () -> A): A = resilient(complexPolicy(), f)
suspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend () -> A): A = schedule
.doWhile { input, _ -> input !is StopException }
.retry(f)
fun <A> complexPolicy(): Schedule<A, List<A>> =
Schedule.recurs<A>(fastTryTimes)
.andThen(
Schedule.exponential<A>(exponentialInitialDuration)
.doWhile { _, out -> out < exponentialMaxDuration }
.andThen(
Schedule.spaced<A>(exponentialMaxDuration).jittered()))
.zipRight(Schedule.identity<A>().collect())
private fun tryConnect(): Resource<Socket> = resource {
val socket = tryCreateSocket().bind()
tryConnect(socket).bind()
socket
} releaseCase { socket, exitCase ->
<http://logger.info|logger.info> { "Closing the socket: $socket with exit: $exitCase" }
}
fun start(): Flow<ByteArray> = flow {
withExponentialBackOff {
tryConnect().use { connection ->
emit(connection.receiveMessage())
}
}
}
Ruanfi
04/18/2023, 4:48 AMRuanfi
04/26/2023, 10:25 AMsimon.vergauwen
04/29/2023, 12:03 PMSchedule
in order to do this. Is there any way to distinct between the two scenarios?
You can use doWhile { throwable, output -> .. }
to "finish" the schedule, and then compose it with itself using andThen(complexSchedule())
which basically means. "Run until this condition, andThen run the schedule again".
I'm not sure how tryConnect
works but there are not finalizers, or close methods being invoked. 🤔Ruanfi
05/01/2023, 1:09 PMsuspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend (() -> Unit) -> A): A {
var succeededBefore: Boolean
return Schedule.forever<Throwable>()
.retry {
succeededBefore = false
schedule
.doWhile { _, _ -> !succeededBefore }
.retry { f { succeededBefore = true } }
}
}
So effect which I retrying should provide the back-knowledge, if there was a successful retry. Just calling the provided function { succeededBefore = true }
So I can apply the chain of exponential backoffs while there was no any successful invocation yet, and restart the policy if there was successful invocation and now another problem after that success.