In Kotlin Coroutines, how to I verify flow backpre...
# coroutines
s
In Kotlin Coroutines, how to I verify flow backpressure, that is, if I have a flow that is using
buffer()
, how do I check and log whenever the producer and the consumer suspend themselves due to backpressure? I need to do this in order to fine tune my buffer size and parallelism from the consumer side.
z
Backpressure happens when
emit
suspends, so something like this helper function should work: https://pl.kotl.in/cB3p6vKXi
đź‘Ť 1
In case playground deletes that:
Copy code
import kotlin.coroutines.intrinsics.*
import kotlinx.coroutines.*

fun main() {
  runBlocking {
    onSuspended({ println("1 suspended") }) {
      println("1")
    }
Copy code
onSuspended({ println("2 suspended") }) {
      println("2 before delay")
      delay(500)
      println("2 after delay")
    }
  }
}
Copy code
suspend inline fun <T> onSuspended(
    crossinline onSuspended: () -> Unit,
    noinline block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn { cont ->
  val result = block.startCoroutineUninterceptedOrReturn(cont)
  if (result == COROUTINE_SUSPENDED) {
    onSuspended()
  }
  result
}
Then you can write a flow operator that wraps its call to emit with
onSuspended
and log or increment a counter or whatever if
emit
suspends. Put that operator downstream of your
buffer
so it sees the backpressure.
Copy code
fun <T> Flow<T>.onBackpressure(
  onBackpressure: () -> Unit
): Flow<T> = flow {
  collect {
    onSuspended(onBackpressure) {
      emit(it)
    }
  }
}
Curious if @billjings has a better idea or knows why this won’t work (I haven’t tried it on real code)
f
You can also write a custom implmentation of the buffer operator that is instrumented: https://pl.kotl.in/_Zv8CoyzX However, it doesn't fuse and I'd not use it production. But to debug it could work.
s
@Zach Klippenstein (he/him) [MOD] loved the approach Zach, thank you so much