Hi Everyone! Just looking into using a `SharedFlow...
# coroutines
p
Hi Everyone! Just looking into using a
SharedFlow
in our backend application. We're receiving updates and are publishing these on the
SharedFlow
. This works great because it very easy to understand workflow. However, we would like to implement metrics around this. Specifically the SharedFlow has a predefined buffer, and we'd like to be aware of when it is filling up. We can detect when the buffer is full, because the
tryEmit
returns
false
. Is there any way for us to get an early warning here, in case of a slow consumer? It looks like we can check the number of consumers on the flow, but there is no way to check the buffer status. Has anyone got any ideas how to monitor this?
r
Looking at it I guess you'd need to use reflection to get access to SharedFlowImpl and then again for its private variables, prone to breaking of course
h
Those are private variables in the impl, the only thing you have exposed is the replayCache and I'm not sure that'll be of any help, maybe file a feature request? I'm curious about this as well
p
I'd rather avoid reflection and accessing private variables.. perhaps a feature request would be in order 🙂
z
You could maybe do this with a downstream operator that detects backpressure. I’m not sure you could accurately calculate the exact buffer use this way, but you could at least get a signal about slow consumers.
p
I've got a timer on the downstream flows, so I can detect slow ones
but it's not quite the same as detecting when the consumers are causing a problem upstream
z
I’m not sure how a timer could detect backpressure, or what “causing a problem” means in your particular case.
r
Copy code
fun <T> Flow<T>.onEmitTimeout(duration: Duration, block: () -> Unit): Flow<T> = flow {
    collect {
        coroutineScope {
            select {
                launch { emit(it) }.onJoin {}
                onTimeout(duration) { block() }
            }
        }
    }
}
this timer could I think
well, not backpressure per-se, since it knows nothing about the upstream buffer
I don't think you'd be able to write a downstream operator to detect backpressure when the buffer is upstream
z
Yea you’re right, I think you’d just end up reimplementing the buffering
r
you might be able to wrap your MutableSharedFlow (and/or write a custom shareIn) so that you can detect when you are unable to emit into it immediately but similarly you don't know which downstream is the culprit
p
I meant, we are using metrics to time our consumers
Copy code
flow {
    val outerFlow = this
    innerFlow
        .collect { recordUpdate ->
            val sample = Timer.start()
            counter.increment()
            outerFlow.emit(recordUpdate)
            sample.stop(timer)
        }
}
we are also able to detect when the buffer is full, as like you say, tryEmit returns false at that point
r
I agree a view into the sharedflow stats would be nice, I don't know if the team will want to expose the internals that way. Worth asking though.
p
👍
d
w
Sorry to be late to the party, but I had a thought reading this. You mention you have metrics, so if you have (or add) metrics to the producer and to each consumer counting the number of elements produced/consumed, your current buffer usage is the difference between the producer count and the minimum of the consumer counts. Similarly, assuming you have some consumer which is very fast, your current buffer usage is the difference between the maximum consumer count and minimum consumer count.