Is there a way to know how "full" a buffered chann...
# coroutines
d
Is there a way to know how "full" a buffered channel is? I am adapting a callback-based asynchronous API to a
Flow
and I'd like to introduce backpressure (the API has a
request
mechanic, with which I can request more elements from upstream). I'd like to only request more elements from upstream if the channel (created by
callbackFlow
) doesn't already have a full buffer (i.e. my consumer is slower than the producer).
t
Flow
and `Channel`s already have backpressure by design. You can use the
buffer
operator to change the buffer size. Producers will suspend as long as the buffer is full.
d
Yes, but the API is not suspending, that's why I am adapting it. I give a (non-suspending) callback to the API, which has
onNext
and
onCompleted
methods. I can also call
request
if I want more elements to arrive in
onNext
.
And I only want to call
request
if my buffer is not full, i.e. I can actually take more elements in without just storing them in memory.
So I can't just suspend in
onNext
. I could block the thread, but that defeats the point.
Current code (simplified):
Copy code
class CallbackImpl<V>(private val scope: ProducerScope<V>) : CallbackIface<V> {
  override fun onNext(value: V) {
    check(scope.channel.offer(value)) // channel is unlimited, to not loose messages
    request(1) // <= I only want to do this if the channel actually has room
  }
  override fun onCompleted() {
    scope.channel.close()
  }
}
fun adaptApi(): Flow<V> {
  return callbackFlow<V> {
    api.doStuff(CallbackImpl(this))
    awaitClosed()
  }.buffer(Channel.UNLIMITED)
}
Now that I write this out in a simplified form I also noticed that I then also need to know if the channel becomes exhausted, so I need to request more elements again...
So what I want (pseudocode):
channel.runWhenBufferLessThan(2) { api.request(1) }
This would be "easy" if the backpressure was enforced. However
request
is only a hint, and the API may call
onNext
at any time, in which case I need to buffer the elements.
If I do the equivalent, I am just launching potentially infinite coroutines if my consumer is slow and the producer doesn't adhere to the backpressure
d
What if you called
sendBlocking
?
d
Yes, then I am blocking whichever thread is calling
onNext
, making things no longer non-blocking... In this case
onNext
is called from a network thread actually (netty), so that's just a no-go.
d
If you're producer does't support back-pressure and you don't want to lose any emissions, then there isn't anything much more efficient than
.buffer(Channel.UNLIMITED)
. It's better than launching unlimited coroutines.
d
I know. But in the case where it does support backpressure I still want to adhere to that and not just buffer forever...
d
In theory I guess you could check if
scope.channel.offer(value)
returned false, then start a coroutine to do the suspension for you, then call
request(1)
at the end of the coroutine. Even for this case, I guess you could send the excess oncoming elements to the coroutine via another channel...
d
In theory I guess you could check if
scope.channel.offer(value)
returned false, then start a coroutine to do the suspension for you, then call
request(1)
at the end of the coroutine.
Yes, this is what I am doing currently and it's the example I linked above. But if the producer is not adhering to the backpressure and just keeps producing elements,
offer
keeps returning false and I keep launching new coroutines. Then I am not buffering in the channel, but I am buffering using suspended coroutines (which are all waiting to add one element to the channel, which is full).
Current idea:
callbackFlow
with
buffer(RENDEZVOUS)
. Inside create another channel, which has unlimited buffering. Launch a coroutine which polls the buffered channel. If it finds an element, it sends it on to the
callbackFlow
channel (likewise with close/exception). If
poll
returns null that means we need to request more elements, so then we call
request
. Then in my callback
onNext
I just do
offer
on the buffered channel (which will always succeed without blocking, because it's an unlimited channel). Does this sound even remotely reasonable? It feels very weird having two channels...
Time for bed. Maybe my brain will come up with something better while I sleep.
d
Yeah, that seems somewhat reasonable, it's what I described. As as optimisation you can replace callbackFlow with just
flow
. Then you'd only have one channel. I feel a nice solution coming along here.
d
As as optimisation you can replace callbackFlow with just
flow
.
No, because you are not allowed to call
emit
from a different coroutine
d
Refactor it to allow you to emit?
Have a
flow { }
which creates a channel and spawns a coroutine, then simply read from the channel and emit directly in
flow
.
d
Hm, yeah, that could work. Will try that tomorrow. Thanks for your input throughout my confused ramblings
This is my code now:
Copy code
return flow {
        val bufferedChannel = Channel<Result>(Channel.UNLIMITED)

        val callback = CallbackAdapter(bufferedChannel)
        apiCall(observer)

        // request initial element
        observer.request(1)

        // wait for first element, suspending
        while (bufferedChannel.transferOneSuspend(this@flow)) {
            // keep transferring while the buffer has elements
            while (bufferedChannel.transferOneIfBuffered(this@flow)) {}

            // buffer ran dry, request more elements
            callback.request(requestSize)
            // and now go back to suspending transfer until more arrive
        }
    }
It uses two helper functions:
Copy code
/**
 * Transfer one element from this `ReceiveChannel` into the given `FlowCollector` if it is possible to do so
 * without suspending to receive.
 *
 * If this channel has failed, this method will throw the causing exception.
 * If no element is currently buffered in this channel or the channel is
 * [closed for receive][ReceiveChannel.isClosedForReceive], this method will return false.
 * Otherwise this method returns `true`.
 */
suspend fun <V : Any> ReceiveChannel<V>.transferOneIfBuffered(target: FlowCollector<V>): Boolean {
    val element = poll()
    return if (element == null) {
        false
    } else {
        target.emit(element)
        true
    }
}

/**
 * Transfer one element from this `ReceiveChannel` into the given `FlowCollector`.
 * If this channel has failed, this method will throw the causing exception.
 * If this channel is closed, this method will return `false`.
 * If an element was successfully transferred, this method will return `true`.
 */
@ExperimentalCoroutinesApi
suspend fun <V : Any> ReceiveChannel<V>.transferOneSuspend(target: FlowCollector<V>): Boolean {
    val element = receiveOrNull()
    return if (element == null) {
        // we are done
        false
    } else {
        target.emit(element)
        true
    }
}
I am quite happy with it.
d
Nice