Peter Kievits
07/24/2024, 8:05 AMSharedFlow
in our backend application. We're receiving updates and are publishing these on the SharedFlow
. This works great because it very easy to understand workflow. However, we would like to implement metrics around this. Specifically the SharedFlow has a predefined buffer, and we'd like to be aware of when it is filling up. We can detect when the buffer is full, because the tryEmit
returns false
. Is there any way for us to get an early warning here, in case of a slow consumer? It looks like we can check the number of consumers on the flow, but there is no way to check the buffer status. Has anyone got any ideas how to monitor this?ross_a
07/24/2024, 8:44 AMHristijan
07/24/2024, 12:27 PMPeter Kievits
07/24/2024, 12:38 PMZach Klippenstein (he/him) [MOD]
07/24/2024, 2:14 PMPeter Kievits
07/24/2024, 2:22 PMPeter Kievits
07/24/2024, 2:22 PMZach Klippenstein (he/him) [MOD]
07/24/2024, 2:31 PMross_a
07/24/2024, 2:32 PMfun <T> Flow<T>.onEmitTimeout(duration: Duration, block: () -> Unit): Flow<T> = flow {
collect {
coroutineScope {
select {
launch { emit(it) }.onJoin {}
onTimeout(duration) { block() }
}
}
}
}
this timer could I thinkross_a
07/24/2024, 2:32 PMross_a
07/24/2024, 2:32 PMZach Klippenstein (he/him) [MOD]
07/24/2024, 2:34 PMross_a
07/24/2024, 2:35 PMPeter Kievits
07/24/2024, 2:36 PMflow {
val outerFlow = this
innerFlow
.collect { recordUpdate ->
val sample = Timer.start()
counter.increment()
outerFlow.emit(recordUpdate)
sample.stop(timer)
}
}
Peter Kievits
07/24/2024, 2:36 PMross_a
07/24/2024, 2:37 PMPeter Kievits
07/24/2024, 2:37 PMDmitry Khalanskiy [JB]
07/24/2024, 3:00 PMWesley Hartford
08/01/2024, 4:11 PM