Vladas
07/09/2025, 7:24 AMflow {
while (currentCoroutineContext().isActive) {
val nr = Random.nextInt(1, 10)
println("generated $nr")
if (nr <= 2) throw Exception("Less or equal than 2")
emit(nr)
}
}
.retry {
println("retry 1")
delay(1000)
true
}
.map {
println("got $it")
if (it >= 8) throw Exception("More or equal than 8")
delay(1000)
it
}
.retry {
println("retry 2")
delay(1000)
true
}
.launchIn(this)
Vladas
07/09/2025, 7:42 AMflow {
while (currentCoroutineContext().isActive) {
val nr = Random.nextInt(1, 10)
println("generated $nr")
if (nr <= 2) throw Exception("Less or equal than 2")
emit(nr)
}
}
.retry {
println("retry 1")
delay(1000)
true
}.transform {
while (currentCoroutineContext().isActive) {
try {
emit(it)
break
} catch (e: Exception) {
if (e is CancellationException) throw e
println("retry 2")
delay(1000)
}
}
}
.map {
println("got $it")
if (it >= 8) throw Exception("More or equal than 8")
delay(1000)
it
}
.launchIn(this)
streetsofboston
07/09/2025, 11:22 AMEither<Error,Int>
instead of plain Ints, to model the behavior you want?Vladas
07/09/2025, 12:01 PM.map
or etc to retry.
My current helper extension, fun <T> Flow<T>.checkpoint(
retryAfter: Duration = 10.seconds,
onRetry: (Exception) -> Unit = {},
): Flow<T> = transform {
while (currentCoroutineContext().isActive) {
try {
emit(it)
break
} catch (e: Exception) {
if (e is CancellationException) throw e
onRetry(e)
delay(retryAfter)
}
}
}
Vladas
07/09/2025, 12:54 PMIllegalStateException: Flow exception transparency is violated
Vladas
07/09/2025, 4:23 PMflow {
while (currentCoroutineContext().isActive) {
val nr = Random.nextInt(1, 10)
println("generated $nr")
if (nr <= 2) throw Exception("Less or equal than 2")
emit(nr)
}
}.retry {
delay(1000)
println("retry 1 $it")
true
}.flatMapConcat { nr ->
flow {
println("got $nr")
if (nr >= 8) throw Exception("More or equal than 8")
delay(1000)
emit(nr)
}.retry {
delay(1000)
println("retry 1 $it")
true
}
}.launchIn(this)
Now I'll just make some helper extensions to simplify it