Hi, I have a complex Schedule. It works fine as d...
# arrow
r
Hi, I have a complex Schedule. It works fine as described, but if my suspend effect failed continuously and reach the last "spaced" state and finally succeed, then Schedule stays in this state forever. Next fails row would be just spaced, but I'd like to restart whole policy if effect succeed. The problem with my suspend effect is - it's either failed, or suspend infinitely (inside is a infinite loop to emit some events as a flow). So I don't have suspend effect success response. I use this as a way to reconnect my connections on error, but I'd like to reconnect with exponential backoff. Maybe I cooked it wrongly and Schedule is not a suitable entity to use.
Copy code
/**
 * If effect failed, then try fast [fastTryTimes] times again,
 * then try Exponential backoff with initial [exponentialInitialDuration] for [exponentialMaxDuration]
 * then try spaced with [exponentialMaxDuration] jittered until success
 */
@ExperimentalTime
fun <A> complexPolicy(): Schedule<A, List<A>> =
    Schedule.recurs<A>(fastTryTimes)
        .andThen(
            Schedule.exponential<A>(exponentialInitialDuration)
                .doWhile { _, out -> out < exponentialMaxDuration }
                .andThen(
                    Schedule.spaced<A>(exponentialMaxDuration).jittered()))
        .zipRight(Schedule.identity<A>().collect())
s
Hey @Ruanfi, This actually looks correct following the KDoc attached on the function. I'm not sure I entirely understand your use-case correctly though 🤔 How are you using this, I assume with
retry
around the operation that is emitting? I think the detail is on how you called this.
Copy code
while(true) {
  complexPolicy<Throwable>().retry {
    // code that emits
  }
}
This would "restart" the policy on success (in between emits), but I am guessing that you want this around the creation of your connection? 🤔 So you currently might have something like this?
Copy code
complexPolicy<Throwable>().retry {
  val conn = createConnection()
  while(true) {
    val res = conn.doSomething()
    emit(res)
  }
}
r
Hi, @simon.vergauwen, thank you for your reply. Your last code-snippet is how I'm using it. There is a bunch of Resources, so I'd like them to be closed correctly. So each time the inner infinite loop fails, it moves outer Schedule policy one step forward, but I need it to go into the start state, if I have successful connection and can emit values. In theory I can send something through the channel I could open, but how to use such a signal inside the Schedule to reset it's inner state? Or I can use boolean, so I can set it to false before resource creation and as true if everything acquired successfully, so Schedule probe will have two inputs - Exception and Boolean (error after success, or error after error). Or I overcomplicating it, having lack of experience?
s
My pleasure. Could you share a more complete pseudo snippet of the usage site? It's not entirely clear to me what is happening, and what might be the fix. Resources should get closed correctly, regardless of Schedule or other code. Perhaps you'd want two separate schedules though. One for resources, and one for wrapping
emit
.
r
@simon.vergauwen, here is a more expanded snippet:
Copy code
suspend fun <A> withExponentialBackOff(f: suspend () -> A): A = resilient(complexPolicy(), f)

suspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend () -> A): A = schedule
    .doWhile { input, _ -> input !is StopException }
    .retry(f)

fun <A> complexPolicy(): Schedule<A, List<A>> =
    Schedule.recurs<A>(fastTryTimes)
        .andThen(
            Schedule.exponential<A>(exponentialInitialDuration)
                .doWhile { _, out -> out < exponentialMaxDuration }
                .andThen(
                    Schedule.spaced<A>(exponentialMaxDuration).jittered()))
        .zipRight(Schedule.identity<A>().collect())

private fun tryConnect(): Resource<Socket> = resource {
    val socket  = tryCreateSocket().bind()
    tryConnect(socket).bind()
    socket
} releaseCase { socket, exitCase ->
    <http://logger.info|logger.info> { "Closing the socket: $socket with exit: $exitCase" }
}
    
fun start(): Flow<ByteArray> = flow {
    withExponentialBackOff {
        tryConnect().use { connection ->
            emit(connection.receiveMessage())
        }
    }
}
So in case of failure, we release resources, and reconnect. What "withExponentialBackOff" is doing - just moving along the schedule policy during the each failure. But what I need - if there is a failures in a row one by one (e.g. server down for some period) then moving along the policy (fail N times in a row, then exponential backoff, then spaced backoff), so we can reach a final step (spaced backoff) if server down for a long time. Then suppose the server is up. We connected successfully, we do our job. Then one failure after success, so in this case "withExponentialBackOff" will continue from the last step, and will give us spaced backoff. But what I need is a reset of the policy, so we can start from the beginning. The idea is - if it's some flaky behaviour, then we just reconnect as fast as possible. If N failures in a row happens, then we don't want to overwhelm the system and give it som backoff delay.
Hi, @simon.vergauwen, does my later description brought any clearance? May be you know the way to solve it with ease? Thank you.
s
Hey @Ruanfi, Sorry for the late reply. I think we'll have to create some kind of recursive
Schedule
in order to do this. Is there any way to distinct between the two scenarios? You can use
doWhile { throwable, output -> .. }
to "finish" the schedule, and then compose it with itself using
andThen(complexSchedule())
which basically means. "Run until this condition, andThen run the schedule again". I'm not sure how
tryConnect
works but there are not finalizers, or close methods being invoked. 🤔
r
That what I come up with until now:
Copy code
suspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend (() -> Unit) -> A): A {
    var succeededBefore: Boolean

    return Schedule.forever<Throwable>()
        .retry {
            succeededBefore = false
            schedule
                .doWhile { _, _ -> !succeededBefore }
                .retry { f { succeededBefore = true } }
        }
}
So effect which I retrying should provide the back-knowledge, if there was a successful retry. Just calling the provided function
{ succeededBefore = true }
So I can apply the chain of exponential backoffs while there was no any successful invocation yet, and restart the policy if there was successful invocation and now another problem after that success.