```fun main() { var no = 0 runBlocking { ...
# coroutines
u
Copy code
fun main() {
    var no = 0
    runBlocking {
        // producer
        val someEventBus: Flow<Int> = flow {
            while (true) {
                emit(no++)
                delay(1.seconds)
            }
        }

        // consumer
        someEventBus
            .onEach {
                // error situation on consuming events
                if (it == 4) error("error")
            }
            .catch { println("error = $it") }
            .launchIn(this)
    }
}
When consuming flow like upper code snippets, when exception thrown on onEach or collect function block that called by consumer, the consuming is canceled. Bun what i want to do is consume this flow infinitely on total application lifecycle (until application is closed. I’m developing event worker application). The simplest way is use try catch on onEach or collect function but i think it is not seem’s graceful. What is the alternative way to ignore exception and still consuming flow?
d
From what I can tell, there really isn't a way to do it. In other frameworks, its the concept of "catch and continue" or "catch and resume next", but flow doesn't have it. You might be able to create your own extension function that does it, but nothing exists out of the box.
Copy code
fun main() {
    runBlocking {
        // producer
        val someEventBus = flow {
            var no = 0
            while (true) {
                emit(no++)
                delay(1.seconds)
            }
        }

        // consumer
        someEventBus
            .mapCatching {
                // error situation on consuming events
                if (it == 4) error("error")
                it
            }
            .onlySuccess()
            .onEach {
                println("Received $it")
            }
            .launchIn(this)
    }
}

sealed interface Result<R> {
    data class Success<R>(val value: R) : Result<R>
    data class Failure<R>(val cause: Throwable) : Result<R>
}

fun <T, R> Flow<T>.mapCatching(action: suspend (T) -> R) =
    map {
        try {
            Result.Success(action(it))
        } catch (ex: Throwable) {
            Result.Failure(ex)
        }
    }

fun <T> Flow<Result<T>>.onlySuccess() =
    filterIsInstance<Result.Success<T>>()
        .map { it.value }
That said, its possible that Flow is the wrong abstraction for what you might be trying to do. `Channel`s might be a better approach.
u
Great example of using Result on Flow. Thank you for your opinion, But there is something that should consider more. I named the property of flow builders return value “`some*EventBus*`”. That because my application is event worker program that EventBus should broadcast same event to all consumers. So i named that property someEventBus and real production code, i considered of using SharedFlow. Im sorry about the code using just cold flow. That’s cuz write the code snippet more simply. Channel is hot flow like SharedFlow, but it is not broadcast event. Then using mapCaching custom transformer functions should be good choice i think.
d
I think you’re going to struggle with a never ending SharedFlow, since it keeps a list of all emitted values. SharedFlow is for when you want all collectors to receive all events, past and present. If you have event listeners that may be added or removed over time that only care about new events, or if all your event listeners are registered at once, you probably want a different broadcast mechanism.
r
Note the most important point if you want to swallow Exceptions is you must not do this for CancellationException because this will break everything else in your flow