Udith
08/16/2024, 8:09 AMtransform
and do the offset commit after the emit
, the commit will not be reachable on errors right ?
.transform{
emit(it)
it.receiverOffset().acknowledge()
}
.runOnDownstreamException{item,cause->
item.receiverOffset().commit().awaitSingleOrNull()
}
And
public fun <T> Flow<T>.runOnDownstreamException(
action: suspend FlowCollector<T>.(item: T, cause: Throwable) -> Unit
): Flow<T> = flow {
collect {
try {
emit(it)
} catch (e: Exception) {
withContext(NonCancellable) { action(it, e) }
throw e
}
}
}
Any ideas ?Udith
08/16/2024, 8:39 AM