seetha
03/29/2021, 9:33 PMarrow.fx.coroutines.Schedule . Is there a way to get the number of attempt in the Either.catch
Either.catch{
retry(Schedule.exponential(250.milliseconds)) {
evalOn(IOPool) {
//do something
incrementMetricSuccess(attempt) -------> want to increment metric with the attempt it got successful
}
}
}.mapLeft {
incrementMetricSuccess(attempt) -------> want to increment metric with the attempt to fail
}stojan
03/30/2021, 9:17 AMSchedule.exponential returns the time passed.... so it's output isn't very useful for you.... you can combine it with using zipRight with Schedule.forever (which repeats forever and returns number of repetitions)....
now, zipRight means we only get the output of the second schedule (meaning the number of repetitions).....
zipRight combines the schedules using and meaning you get the behaviour of Schedule.exponentialsimon.vergauwen
03/30/2021, 10:09 AMattempt is coming from? If you need it available inside retry then you need to keep a Atomic counter or something. If you only need to access the number of retries outside of retry then you can compose Schedule like @stojan preposes.seetha
03/30/2021, 1:42 PMsuspend fun publishMessage(
message: Message, envelope: Envelope
): Either<PubError, PubResult> =
Either.catch {
retry(Schedule.exponential(Duration(1, TimeUnit.SECONDS), 2.0)) {
evalOn(IOPool) {
val host = router.routeClient(message.id)
val publishResult = publish(message, host, attempt)
when (publishResult) {
is PubResult.Error -> metrics.recordPublishError(publishResult)
is PubResult.Failure -> metrics.recordPublishFailure(publishResult)
else -> metrics.recordPublishSuccess(publishResult)
}
publishResult
}
}
}.mapLeft {
PubError(envelope, PubResult.Error(it, message, attempt, host))
}
I am actually refactoring the code where previously this was using Flowable
fun retryWithBackoff(scheduler: Scheduler, vararg backoffSeconds: Int): Flowable<Int> = Flowable.fromArray(backoffSeconds)
.flatMap { it.toFlowable() }
.zipWith(generateSequence(1) { it + 1 }.asIterable()) { delay: Int, attempt: Int -> delay to attempt }
.flatMap(maxConcurrency = 1, bufferSize = 1) { (delay, attempt) ->
Flowable.timer(delay.toLong(), SECONDS, scheduler)
.map { attempt }
}
^^ above retryWithBackOff is a custom function where it has the access to attempt!
I was wondering whether Schedule itself have any parameters exposed where I can have access to the times its being tried on
As you see in publishMessage function, I am trying to get hold of attempt or times of retries both inside retry and in mapLeft of Either.catch blockseetha
03/30/2021, 2:02 PMretry is good enough and I will go with keeping an Atomic counter options. Thanks for all the suggestions!simon.vergauwen
03/30/2021, 2:12 PMAtomic counter is the most straightforward way to safely track it. Given that no parallelism is involved a simple var x: Int would also work.seetha
03/30/2021, 2:14 PM