SecretX
03/23/2024, 1:17 PMbuffer()
, how do I check and log whenever the producer and the consumer suspend themselves due to backpressure? I need to do this in order to fine tune my buffer size and parallelism from the consumer side.Zach Klippenstein (he/him) [MOD]
03/24/2024, 9:11 PMemit
suspends, so something like this helper function should work: https://pl.kotl.in/cB3p6vKXiZach Klippenstein (he/him) [MOD]
03/24/2024, 9:13 PMimport kotlin.coroutines.intrinsics.*
import kotlinx.coroutines.*
fun main() {
runBlocking {
onSuspended({ println("1 suspended") }) {
println("1")
}
onSuspended({ println("2 suspended") }) {
println("2 before delay")
delay(500)
println("2 after delay")
}
}
}
suspend inline fun <T> onSuspended(
crossinline onSuspended: () -> Unit,
noinline block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn { cont ->
val result = block.startCoroutineUninterceptedOrReturn(cont)
if (result == COROUTINE_SUSPENDED) {
onSuspended()
}
result
}
Zach Klippenstein (he/him) [MOD]
03/24/2024, 9:14 PMonSuspended
and log or increment a counter or whatever if emit
suspends. Put that operator downstream of your buffer
so it sees the backpressure.
fun <T> Flow<T>.onBackpressure(
onBackpressure: () -> Unit
): Flow<T> = flow {
collect {
onSuspended(onBackpressure) {
emit(it)
}
}
}
Zach Klippenstein (he/him) [MOD]
03/24/2024, 9:15 PMfranztesca
03/25/2024, 11:46 AMSecretX
03/27/2024, 12:34 PM