Any way to retain value of the flow before first r...
# coroutines
v
Any way to retain value of the flow before first retry and not restart whole flow if it throws inside map when it >= 8? I expect it to give last value (get stuck in map with this example) Second retry isn't suited here since it restarts whole flow. Thank you
Copy code
flow {
    while (currentCoroutineContext().isActive) {
        val nr = Random.nextInt(1, 10)
        println("generated $nr")
        if (nr <= 2) throw Exception("Less or equal than 2")
        emit(nr)
    }
}
    .retry {
        println("retry 1")
        delay(1000)
        true
    }
    .map {
        println("got $it")
        if (it >= 8) throw Exception("More or equal than 8")
        delay(1000)
        it
    }
    .retry {
        println("retry 2")
        delay(1000)
        true
    }
    .launchIn(this)
My current one works doesn't with transform protecting downstream, but not pretty and I'm scared of catching exceptions on emit
Copy code
flow {
    while (currentCoroutineContext().isActive) {
        val nr = Random.nextInt(1, 10)
        println("generated $nr")
        if (nr <= 2) throw Exception("Less or equal than 2")
        emit(nr)
    }
}
    .retry {
        println("retry 1")
        delay(1000)
        true
    }.transform {
        while (currentCoroutineContext().isActive) {
            try {
                emit(it)
                break
            } catch (e: Exception) {
                if (e is CancellationException) throw e
                println("retry 2")
                delay(1000)
            }
        }
    }
    .map {
        println("got $it")
        if (it >= 8) throw Exception("More or equal than 8")
        delay(1000)
        it
    }
    .launchIn(this)
s
Could you use sum types like
Either<Error,Int>
instead of plain Ints, to model the behavior you want?
v
Ints here are just for example. I'm computing / network stuff and just don't want to lose progress inside in the middle. I just want some checkpoints. I just don't want to write bunch of while loops inside
.map
or etc to retry. My current helper extension, only catches and restarts from downstream
Copy code
fun <T> Flow<T>.checkpoint(
    retryAfter: Duration = 10.seconds,
    onRetry: (Exception) -> Unit = {},
): Flow<T> = transform {
    while (currentCoroutineContext().isActive) {
        try {
            emit(it)
            break
        } catch (e: Exception) {
            if (e is CancellationException) throw e
            onRetry(e)
            delay(retryAfter)
        }
    }
}
This extension is bad it gives
IllegalStateException: Flow exception transparency is violated
Finally figured it out
Copy code
flow {
    while (currentCoroutineContext().isActive) {
        val nr = Random.nextInt(1, 10)
        println("generated $nr")
        if (nr <= 2) throw Exception("Less or equal than 2")
        emit(nr)
    }
}.retry {
    delay(1000)
    println("retry 1 $it")
    true
}.flatMapConcat { nr ->
    flow {
        println("got $nr")
        if (nr >= 8) throw Exception("More or equal than 8")
        delay(1000)
        emit(nr)
    }.retry {
        delay(1000)
        println("retry 1 $it")
        true
    }
}.launchIn(this)
Now I'll just make some helper extensions to simplify it