Hello Team, I am new to Arrow library. Have a ques...
# arrow
s
Hello Team, I am new to Arrow library. Have a question on 
arrow.fx.coroutines.Schedule
  . Is there a way to get the number of attempt in the Either.catch
Copy code
Either.catch{
	retry(Schedule.exponential(250.milliseconds)) {
		evalOn(IOPool) {
			//do something
	  		incrementMetricSuccess(attempt)   -------> want to increment metric with the attempt it got successful
		}
	}
}.mapLeft {
	incrementMetricSuccess(attempt)    -------> want to increment metric with the attempt to fail
}
s
Schedule.exponential
returns the time passed.... so it's output isn't very useful for you.... you can combine it with using
zipRight
with
Schedule.forever
(which repeats forever and returns number of repetitions).... now,
zipRight
means we only get the output of the second schedule (meaning the number of repetitions).....
zipRight
combines the schedules using
and
meaning you get the behaviour of
Schedule.exponential
☝️ 1
s
Could you add some more types and signatures in your example @seetha? I'm wondering where
attempt
is coming from? If you need it available inside
retry
then you need to keep a
Atomic
counter or something. If you only need to access the number of retries outside of
retry
then you can compose
Schedule
like @stojan preposes.
s
@simon.vergauwen Thanks for the response. Here is my suspend function
Copy code
suspend fun publishMessage(
    message: Message, envelope: Envelope
): Either<PubError, PubResult> =
    Either.catch {
        retry(Schedule.exponential(Duration(1, TimeUnit.SECONDS), 2.0)) {
            evalOn(IOPool) {
                val host = router.routeClient(message.id)
                val publishResult = publish(message, host, attempt)
                when (publishResult) {
                    is PubResult.Error -> metrics.recordPublishError(publishResult)
                    is PubResult.Failure -> metrics.recordPublishFailure(publishResult)
                    else -> metrics.recordPublishSuccess(publishResult)
                }
                publishResult
            }
        }
    }.mapLeft {
        PubError(envelope, PubResult.Error(it, message, attempt, host))
    }
I am actually refactoring the code where previously this was using
Flowable
Copy code
fun retryWithBackoff(scheduler: Scheduler, vararg backoffSeconds: Int): Flowable<Int> = Flowable.fromArray(backoffSeconds)
            .flatMap { it.toFlowable() }
            .zipWith(generateSequence(1) { it + 1 }.asIterable()) { delay: Int, attempt: Int -> delay to attempt }
            .flatMap(maxConcurrency = 1, bufferSize = 1) { (delay, attempt) ->
                Flowable.timer(delay.toLong(), SECONDS, scheduler)
                    .map { attempt }
            }
^^ above
retryWithBackOff
is a custom function where it has the access to
attempt
! I was wondering whether
Schedule
itself have any parameters exposed where I can have access to the
times
its being tried on As you see in
publishMessage
function, I am trying to get hold of
attempt
or
times
of retries both inside
retry
and in
mapLeft
of
Either.catch
block
I think thinking more about it I can refactor in a way that having access to attempt just inside
retry
is good enough and I will go with keeping an
Atomic
counter options. Thanks for all the suggestions!
s
Hey @seetha, Using an
Atomic
counter is the most straightforward way to safely track it. Given that no parallelism is involved a simple
var x: Int
would also work.
s
Awesome! Thank you