Using `Schedule` to repeat an operation. Is there ...
# arrow
j
Using
Schedule
to repeat an operation. Is there a way to get the delay after adding jitter to the schedule. Better yet, is there a way to get how much time has elapsed since the start of the schedule.
Copy code
Schedule.exponential<SomeResult>(someConfigDelay)
            .jittered()
            .whileOutput { it < someConfigMaxDelay }
            .logOutput { log.debug("Current delay: $it - ${it.inWholeNanoseconds} ns") }
s
Hey @Jonathan De Leon, Great question! Looking into this I think there is some room for improvement in the
Schedule
API! The reason this isn't behaving as you expected is that the
Duration
output of
Schedule.exponential
is not updated by
jittered
.
jittered
only updates the underlying delayTime, but not the
Output
of
Schedule.exponential
because it is not aware of what type of the previous output is. You can easily write a custom
Schedule
to achieve this though.
Copy code
@OptIn(ExperimentalTime::class)
fun <A> custom(
  base: Duration,
  factor: Double = 2.0,
  jitter: Random = Random.Default
): Schedule<A, Duration> =
  Schedule.delayed(Schedule.forever<A>()
    .map { n ->
      (base * factor.pow(n)) * jitter.nextDouble(0.0, 1.0)
    })
Here we have combined
Schedule.exponential
and
Schedule#jittered
into a single
Schedule
so the output becomes the jittered delay. I hope that helps 🙂
j
Thank you! That helps
@simon.vergauwen Is there a way with
Schedule
to create a policy that polls but with a timeout if no response. Example - Let's continuously repeat a service call, as long as the call returns data, let's continue. Otherwise, if there is no data (using some predicate) and some time has elasped, let's end the repeat. Thinking of using
Schedule
in some sort of webhook event streaming environment but we close the connection if after some time there has been no data.
s
Yes, that is definitely possible. You can just chain
whileInput
to validate the returned data with a custom predicate. It support suspension so you can even call other services or persistence to check if you need to continue, like health check endpoints or something. A
timeout
I would encode it inside
repeat
itself since it all composes nicely. Taking the custom Schedule from above.
Copy code
suspend fun networkCall(): Input {
  delay(100.milliseconds)
  return Input
}

object Input {
  suspend fun isValid(): Boolean = true
}

custom<Input?>(1.seconds)
  .whileInput { input: Input? - > input?.isValid() ?: false }
  .repeat {
    withTimeoutOrNull(500.milliseconds) {
       networkCall()
    }
  }
🙏 1