Hi, I'm using arrow for typed error handling over...
# arrow
f
Hi, I'm using arrow for typed error handling over this use case: 1. I have parallel tasks 2. I don't want to interrupt any task if one sibling has failed 3. I want to accumulate errors on parallel tasks 4. while being able to monitor the progress of completion/failure (whatever the state) Here is some top level code to give a generic skeleton
Copy code
import ...

sealed interface DomainError {
    override fun toString(): String
}
...
class CopiesFailures(val failures: List<DomainError>): DomainError {
    override fun toString(): String = "Failed to copy ${failures.size} files into ..."
}


data class DummyObjectToCopy

suspend fun execute(dryRyn: Boolean, command: suspend () -> Either<DomainError, Unit>): Either<DomainError, Unit> {
    return if (!dryRyn)
        command()
    else {
        delay(100).right()
    }

}

fun copyCommand(x: DummyObjectToCopy): suspend () -> Either<DomainError, Unit> {...}

suspend fun copyWithArrowConcurrency(
    list: List<DummyObjectToCopy>,
    dispatcher: CoroutineDispatcher,
): Either<DomainError, List<Unit>> =
    list
        .map { x -> copyCommand(x) }
        .parMapOrAccumulate(context = dispatcher, concurrency = 1000) {
            execute(false, it).bind()
        }.mapLeft {
            // Either<List<DomainError>, List<Unit>> -> Either<DomainError, List<Unit>>  because I still want to handle a Domain Error and not a list for consistency in the caller
            CopiesFailures(it.toList())
        }
Technical context : I'm implementing a copy utilities (underlying using S3 but not important) using arrow-core:1.2.0-RC & arrow-fx-coroutines:1.2.0-RC. So each of the copy command might take some times and might fails for some reason. I copy a bunch of "files". I want to do it in parallel, without interrupting the siblings if any fails. But at the same time I want to eventually display the ones that didn't work. And be able to display the progress (on the console for now). Personal context : I have just started playing with arrow typed error since the beginning of last week. And with kotlin since the start of the year (previously java Dev). So I may have not taken the better Arrow and Kotlin way for tackling this subject ;). A. The point 4 is the one I'm not able to handle (function copyWithArrowConcurrency). Mixing coroutines + typed error handling and accumulation of errors vs following the progress is a bit complex. I don't care of differentiating the success and errors in the progress count for now, sending to another consumer a dummy message/integer could do the trick. Even if there is failure I want to send/take it into account. B. Would it be possible to do this without using arrow-fx-coroutines if I only want to use the typed errors handling of arrow but not the additional coroutines lib ;) So if you have insights, ll be greatfull --> No emergency on my side, I'm just learning through a concrete case I previously had. Thank you for any advices 😉
Here is something pretty naive I did that have several limitations: * I don't serve the error to the caller, I just display them, so notvery clever because I loose them after... * I have mixed the display of the progress and the error on the console * I launch as much jobs as there is element to copy. I may have millions of them... * no back pressure
Copy code
private suspend fun copyNaive(
    list: List<DummyObjectToCopy>,
    dispatcher: CoroutineDispatcher,
) =
    coroutineScope {
        list.map { x -> copyCommand(x) }.map {
            this@coroutineScope.launch(dispatcher) {
                either { execute(false, it).bind() }
                    // Naive way to send the error another consumer (console) without splitting this flow...
                    .mapLeft { println(it) }
            }
        }.forEachIndexed { index, it ->
            // whatever the success or failure we can still monitor progress
            it.join()
            if (index % 100 == 0)
                println("${percentageRateWith2digits(index, list.size)}%")
        }
    }
s
Hey Fabien,
1. I have parallel tasks
2. I don't want to interrupt any task if one sibling has failed
3. I want to accumulate errors on parallel tasks
This can easily be done using
parMapOrAccumulate
and
Either
using
1.2.0-RC
.
Copy code
val x: Either<NonEmptyList<Error>, List<Success> = 
  list.parMapOrAccumulate(ctx, maxConcurrency) {
      either.bind()
  }
For the progress you might want to pile on something else.
Oh, sorry. You're already using this 😅 Sorry, was a bit too fast I guess.
So just for the progress, what kind-of API do you want? You could do something like.
Copy code
val progress: MutableStateFlow<Int> = MutableStateFlow(0)

val x: Either<NonEmptyList<Error>, List<Success> = 
  list.parMapOrAccumulate(ctx, maxConcurrency) {
      either
        .onRight { progress.update(Int::inc) }
        .onLeft { progress.update(Int::inc) }
        .bind()
  }
Or you could easily track errors and failures by doing something like:
Copy code
val list: List<..> = ...

data class Progress(
  val failed: Int,
  val success: Int,
  val total: Int
)

val progress: MutableStateFlow<Progress> = MutableStateFlow(0, 0, list.size)

val x: Either<NonEmptyList<Error>, List<Success> = 
  list.parMapOrAccumulate(ctx, maxConcurrency) {
      either
        .onRight { progress.update { it.copy(success = it.success + 1) }  }
                .onRight { progress.update { it.copy(failed = it.failed + 1) }  }
        .bind()
  }
And then you can either
collect
the
progress
or just check it's
progress.value
.
If you don't have the need to
collect
you can also replace
MutableStateFlow
with an
AtomicReference
but I assumed that want to have the progress in some kind-of streaming/callback way.
f
Thank You @simon.vergauwen. Thanks for the onRight/onLeft disantanglement. Since the
MutableStateFlow
is conflated (I can not rely on having all increments), I finally launch another job querying the value after a delay like an observable. So indeed an AtomicReference could do the job easily if I keep only a counter as "currentProgress". By the way I tried to hide some Arrow call and logic by using contextReceiver with Raise. I hit a wall
Copy code
Caused by: java.lang.IllegalArgumentException: No argument for parameter VALUE_PARAMETER CONTINUATION_CLASS
As you already mentionned here context Receiver still unstable. Can't wait to have all the bricks to simplify the global code. Thanks, Have a good day!