윤동환
11/21/2023, 12:45 AMfun main() {
var no = 0
runBlocking {
// producer
val someEventBus: Flow<Int> = flow {
while (true) {
emit(no++)
delay(1.seconds)
}
}
// consumer
someEventBus
.onEach {
// error situation on consuming events
if (it == 4) error("error")
}
.catch { println("error = $it") }
.launchIn(this)
}
}
When consuming flow like upper code snippets, when exception thrown on onEach or collect function block that called by consumer, the consuming is canceled. Bun what i want to do is consume this flow infinitely on total application lifecycle (until application is closed. I’m developing event worker application).
The simplest way is use try catch on onEach or collect function but i think it is not seem’s graceful. What is the alternative way to ignore exception and still consuming flow?Daniel Pitts
11/21/2023, 3:19 AMDaniel Pitts
11/21/2023, 3:34 AMfun main() {
runBlocking {
// producer
val someEventBus = flow {
var no = 0
while (true) {
emit(no++)
delay(1.seconds)
}
}
// consumer
someEventBus
.mapCatching {
// error situation on consuming events
if (it == 4) error("error")
it
}
.onlySuccess()
.onEach {
println("Received $it")
}
.launchIn(this)
}
}
sealed interface Result<R> {
data class Success<R>(val value: R) : Result<R>
data class Failure<R>(val cause: Throwable) : Result<R>
}
fun <T, R> Flow<T>.mapCatching(action: suspend (T) -> R) =
map {
try {
Result.Success(action(it))
} catch (ex: Throwable) {
Result.Failure(ex)
}
}
fun <T> Flow<Result<T>>.onlySuccess() =
filterIsInstance<Result.Success<T>>()
.map { it.value }
Daniel Pitts
11/21/2023, 3:42 AM윤동환
11/21/2023, 5:00 AMDaniel Pitts
11/21/2023, 5:03 AMRobert Williams
11/21/2023, 10:34 AMRobert Williams
11/21/2023, 10:37 AM