I have a flux (receiving data from reactor kafka) ...
# coroutines
u
I have a flux (receiving data from reactor kafka) which is converted to a flow and then I do some operations. Then i convert the flow back to a flux and publish to another topic using reactor kafka. Now i dont want the receiver/upstream to acknowledge/commit offset if downstream throws errors. I know the downstream errors are propagated up. So if we add a
transform
and do the offset commit after the
emit
, the commit will not be reachable on errors right ?
Copy code
.transform{
                emit(it)
                it.receiverOffset().acknowledge()
            }
.runOnDownstreamException{item,cause->
                item.receiverOffset().commit().awaitSingleOrNull()
            }
And
Copy code
public fun <T> Flow<T>.runOnDownstreamException(
    action: suspend FlowCollector<T>.(item: T, cause: Throwable) -> Unit
): Flow<T> = flow {
    collect {
        try {
            emit(it)
        } catch (e: Exception) {
            withContext(NonCancellable) { action(it, e) }
            throw e
        }
    }
}
Any ideas ?
👀 2
image.png