Fabien Rouge
05/08/2023, 5:36 PMimport ...
sealed interface DomainError {
override fun toString(): String
}
...
class CopiesFailures(val failures: List<DomainError>): DomainError {
override fun toString(): String = "Failed to copy ${failures.size} files into ..."
}
data class DummyObjectToCopy
suspend fun execute(dryRyn: Boolean, command: suspend () -> Either<DomainError, Unit>): Either<DomainError, Unit> {
return if (!dryRyn)
command()
else {
delay(100).right()
}
}
fun copyCommand(x: DummyObjectToCopy): suspend () -> Either<DomainError, Unit> {...}
suspend fun copyWithArrowConcurrency(
list: List<DummyObjectToCopy>,
dispatcher: CoroutineDispatcher,
): Either<DomainError, List<Unit>> =
list
.map { x -> copyCommand(x) }
.parMapOrAccumulate(context = dispatcher, concurrency = 1000) {
execute(false, it).bind()
}.mapLeft {
// Either<List<DomainError>, List<Unit>> -> Either<DomainError, List<Unit>> because I still want to handle a Domain Error and not a list for consistency in the caller
CopiesFailures(it.toList())
}
Technical context : I'm implementing a copy utilities (underlying using S3 but not important) using arrow-core:1.2.0-RC & arrow-fx-coroutines:1.2.0-RC. So each of the copy command might take some times and might fails for some reason. I copy a bunch of "files". I want to do it in parallel, without interrupting the siblings if any fails. But at the same time I want to eventually display the ones that didn't work. And be able to display the progress (on the console for now).
Personal context : I have just started playing with arrow typed error since the beginning of last week. And with kotlin since the start of the year (previously java Dev).
So I may have not taken the better Arrow and Kotlin way for tackling this subject ;).
A. The point 4 is the one I'm not able to handle (function copyWithArrowConcurrency).
Mixing coroutines + typed error handling and accumulation of errors vs following the progress is a bit complex.
I don't care of differentiating the success and errors in the progress count for now, sending to another consumer a dummy message/integer could do the trick. Even if there is failure I want to send/take it into account.
B. Would it be possible to do this without using arrow-fx-coroutines if I only want to use the typed errors handling of arrow but not the additional coroutines lib ;)
So if you have insights, ll be greatfull --> No emergency on my side, I'm just learning through a concrete case I previously had.
Thank you for any advices 😉Fabien Rouge
05/08/2023, 5:37 PMprivate suspend fun copyNaive(
list: List<DummyObjectToCopy>,
dispatcher: CoroutineDispatcher,
) =
coroutineScope {
list.map { x -> copyCommand(x) }.map {
this@coroutineScope.launch(dispatcher) {
either { execute(false, it).bind() }
// Naive way to send the error another consumer (console) without splitting this flow...
.mapLeft { println(it) }
}
}.forEachIndexed { index, it ->
// whatever the success or failure we can still monitor progress
it.join()
if (index % 100 == 0)
println("${percentageRateWith2digits(index, list.size)}%")
}
}
simon.vergauwen
05/08/2023, 5:43 PM1. I have parallel tasks
2. I don't want to interrupt any task if one sibling has failed
3. I want to accumulate errors on parallel tasksThis can easily be done using
parMapOrAccumulate
and Either
using 1.2.0-RC
.
val x: Either<NonEmptyList<Error>, List<Success> =
list.parMapOrAccumulate(ctx, maxConcurrency) {
either.bind()
}
For the progress you might want to pile on something else.simon.vergauwen
05/08/2023, 5:44 PMsimon.vergauwen
05/08/2023, 5:49 PMval progress: MutableStateFlow<Int> = MutableStateFlow(0)
val x: Either<NonEmptyList<Error>, List<Success> =
list.parMapOrAccumulate(ctx, maxConcurrency) {
either
.onRight { progress.update(Int::inc) }
.onLeft { progress.update(Int::inc) }
.bind()
}
Or you could easily track errors and failures by doing something like:
val list: List<..> = ...
data class Progress(
val failed: Int,
val success: Int,
val total: Int
)
val progress: MutableStateFlow<Progress> = MutableStateFlow(0, 0, list.size)
val x: Either<NonEmptyList<Error>, List<Success> =
list.parMapOrAccumulate(ctx, maxConcurrency) {
either
.onRight { progress.update { it.copy(success = it.success + 1) } }
.onRight { progress.update { it.copy(failed = it.failed + 1) } }
.bind()
}
And then you can either collect
the progress
or just check it's progress.value
.simon.vergauwen
05/08/2023, 5:50 PMcollect
you can also replace MutableStateFlow
with an AtomicReference
but I assumed that want to have the progress in some kind-of streaming/callback way.Fabien Rouge
05/11/2023, 10:12 AMMutableStateFlow
is conflated (I can not rely on having all increments), I finally launch another job querying the value after a delay like an observable. So indeed an AtomicReference could do the job easily if I keep only a counter as "currentProgress".
By the way I tried to hide some Arrow call and logic by using contextReceiver with Raise. I hit a wall
Caused by: java.lang.IllegalArgumentException: No argument for parameter VALUE_PARAMETER CONTINUATION_CLASS
As you already mentionned here context Receiver still unstable. Can't wait to have all the bricks to simplify the global code.
Thanks, Have a good day!