Hello. I have a problem understanding the combined...
# arrow
d
Hello. I have a problem understanding the combined use of Raise/Either with Exceptions. I am developing a REST interface where I output certain errors as DomainError, but also handle network errors as exceptions. Therefore, I have wrapped the code with Either.catch to catch the exceptions (the system should not halt when an exception occurs, but rather continue with the next data record). Inside this, I have directly opened an either block to work in the Raise<DomainError> context and call corresponding functions (context(Raise<DomainError> fun xyz() {}). Now the log is full of warning messages that the CancellationException is swallowed. How should I adjust my code to avoid such errors?
s
Hey Dirk, Can you share some code please? Hard to grasp properly what is going on. This section should give you some more insights, https://arrow-kt.io/learn/typed-errors/working-with-typed-errors/#from-exceptions
During

my talk this year at KotlinConf

, I also discuss wrapping database operations using typed errors, and using Schedule with exceptions to retry exceptional scenarios like network temporarily being down.
d
Hi Simon, when I first skimmed through the documentation I couldn't work out the solution. Unfortunately, I'm rushing from appointment to appointment today, so I'll read through it again tomorrow at a more leisurely pace. A somewhat "mutilated" code (names and designations shortened, as the code belongs to the customer) is as follows:
Copy code
fun tExportieren() =
    exportScope.launch {
        exportSharedFlow
            .onEach {
                measureTime {
                    log.info { "@@@@@@@ Starte Export ($it) @@@@@@@@@" }
                    verarbeiteExportMitEinzelexport(it)
                }
                    .also { zeit ->
                        log.info { "@@@@@@@ Beende Export! Dauer: $zeit ($it) @@@@@@@@@" }
                    }
            }
            .collect()
    }

suspend fun verarbeiteExportMitEinzelexport(
    anforderung: exportierenAnforderung,
) {
    val fehlerliste = Atomic<List<FehlerProtokol>>(emptyList())

    Either.catch {
        either {
            log.debug { "01:    Log in Bearbeitung setzen" }
            kService.aktualisiereLogStatus(anforderung.sstID, StateEnum.IN_BEARBEITUNG)

            log.debug { "02:    Tflow erstellen" }
            val tFlow = tLaden(anforderung)

            log.debug { "03:    Tflow verarbeiten" }
            tFlow
                .buffer(capacity = 2)
                .map { tListe ->
                    tListe
                        .map { it to v1Konvertieren(it) }
                        .parMap(concurrency = konfiguration.maxAnzahlParallelerAnfragenExportT) { (t, g) ->
                            log.debug { "04:    Übertrage T" }

                            // The Problem is in this function
                            tSenden(t, fehlerliste)
                                .onSome { g ->
                                    // ...
                                }
                                .onNone {
                                    // ...
                                }


                        }
                }
                .collectIndexed { index, value -> log.info { "06:  Chunk $index wurde vollständig verarbeitet" } }
        }
            .onLeft { de ->
                log.error("Der Export wurde nicht vollständig verarbeitet: $de")
                either {
                    // Log with Raise-Context
                }
            }
            .onRight {
                log.debug { "Der Export wurde vollständig verarbeitet: ${anforderung}" }
                either {
                    // Log with Raise-Context
                }
            }
    }
}


context(Raise<DomainError>)
@Suppress("LongParameterList", "FunctionNaming")
suspend fun tSenden(
    t: EPXML,
    fehlerliste: Atomic<List<FehlerProtokol>>
): Option<G> =
        when (t.extKey?.trim()) {
            null, "" -> service.legeNeuenTAn(g, fehlerliste)
            else -> service.aktualisiereT(g, fehlerliste)
        }

context(Raise<DomainError>)
override suspend fun legeNeuenTAn(t: T, fehlerliste: Atomic<List<FehlerProtokol>>): Option<G> {
    val response = client.post(G(), t)

    // Something is not working as i expected her
    if (response.status != HttpStatusCode.Created) {
        val error = response.body<Error>()
        fehlerliste.update { list ->
            list + FehlerProtokol("...", error.message)
        }

        if (error.message.startsWith("The property xxxx", true)) {
            log.error { "Ignore..." }
            return None
        } else {
            log.error { "Fehler: ${error.message}" }
            raise(DatensatzKonnteNichtAngelegtWerden(statusCode = response.status.value, body = error))
        }
    } else {
        return response
            .body<G>()
            .also { log.info { "T angelegt: $it" } }
            .toOption()
    }
}
d
Maybe I'm not understanding your problem, but wouldn't you want to allow your network errors to
fail fast
straight up to the top level handler? (See, amongst others, Scott Wlaschin, Against Railroad Programming when used thoughtlessly) e.g. leave the network error exceptions alone and catch them at the top level. Surely you'd only want to spend time on 'Controlled Propagation' if you want to feed these into Logical Failures (Business Failures), which, at first sight doesn't feel like the case here?