diesieben07
01/06/2020, 8:59 PMFlow
and I'd like to introduce backpressure (the API has a request
mechanic, with which I can request more elements from upstream). I'd like to only request more elements from upstream if the channel (created by callbackFlow
) doesn't already have a full buffer (i.e. my consumer is slower than the producer).tseisel
01/06/2020, 9:02 PMFlow
and `Channel`s already have backpressure by design. You can use the buffer
operator to change the buffer size. Producers will suspend as long as the buffer is full.diesieben07
01/06/2020, 9:03 PMonNext
and onCompleted
methods. I can also call request
if I want more elements to arrive in onNext
.request
if my buffer is not full, i.e. I can actually take more elements in without just storing them in memory.onNext
. I could block the thread, but that defeats the point.class CallbackImpl<V>(private val scope: ProducerScope<V>) : CallbackIface<V> {
override fun onNext(value: V) {
check(scope.channel.offer(value)) // channel is unlimited, to not loose messages
request(1) // <= I only want to do this if the channel actually has room
}
override fun onCompleted() {
scope.channel.close()
}
}
fun adaptApi(): Flow<V> {
return callbackFlow<V> {
api.doStuff(CallbackImpl(this))
awaitClosed()
}.buffer(Channel.UNLIMITED)
}
channel.runWhenBufferLessThan(2) { api.request(1) }
request
is only a hint, and the API may call onNext
at any time, in which case I need to buffer the elements.Dominaezzz
01/06/2020, 10:01 PMsendBlocking
?diesieben07
01/06/2020, 10:03 PMonNext
, making things no longer non-blocking...
In this case onNext
is called from a network thread actually (netty), so that's just a no-go.Dominaezzz
01/06/2020, 10:10 PM.buffer(Channel.UNLIMITED)
. It's better than launching unlimited coroutines.diesieben07
01/06/2020, 10:12 PMDominaezzz
01/06/2020, 10:22 PMscope.channel.offer(value)
returned false, then start a coroutine to do the suspension for you, then call request(1)
at the end of the coroutine.
Even for this case, I guess you could send the excess oncoming elements to the coroutine via another channel...diesieben07
01/06/2020, 10:25 PMIn theory I guess you could check ifYes, this is what I am doing currently and it's the example I linked above. But if the producer is not adhering to the backpressure and just keeps producing elements,returned false, then start a coroutine to do the suspension for you, then callscope.channel.offer(value)
at the end of the coroutine.request(1)
offer
keeps returning false and I keep launching new coroutines. Then I am not buffering in the channel, but I am buffering using suspended coroutines (which are all waiting to add one element to the channel, which is full).callbackFlow
with buffer(RENDEZVOUS)
. Inside create another channel, which has unlimited buffering.
Launch a coroutine which polls the buffered channel. If it finds an element, it sends it on to the callbackFlow
channel (likewise with close/exception). If poll
returns null that means we need to request more elements, so then we call request
.
Then in my callback onNext
I just do offer
on the buffered channel (which will always succeed without blocking, because it's an unlimited channel).
Does this sound even remotely reasonable? It feels very weird having two channels...Dominaezzz
01/06/2020, 11:08 PMflow
. Then you'd only have one channel. I feel a nice solution coming along here.diesieben07
01/06/2020, 11:13 PMAs as optimisation you can replace callbackFlow with justNo, because you are not allowed to call.flow
emit
from a different coroutineDominaezzz
01/06/2020, 11:18 PMflow { }
which creates a channel and spawns a coroutine, then simply read from the channel and emit directly in flow
.diesieben07
01/06/2020, 11:20 PMreturn flow {
val bufferedChannel = Channel<Result>(Channel.UNLIMITED)
val callback = CallbackAdapter(bufferedChannel)
apiCall(observer)
// request initial element
observer.request(1)
// wait for first element, suspending
while (bufferedChannel.transferOneSuspend(this@flow)) {
// keep transferring while the buffer has elements
while (bufferedChannel.transferOneIfBuffered(this@flow)) {}
// buffer ran dry, request more elements
callback.request(requestSize)
// and now go back to suspending transfer until more arrive
}
}
It uses two helper functions:
/**
* Transfer one element from this `ReceiveChannel` into the given `FlowCollector` if it is possible to do so
* without suspending to receive.
*
* If this channel has failed, this method will throw the causing exception.
* If no element is currently buffered in this channel or the channel is
* [closed for receive][ReceiveChannel.isClosedForReceive], this method will return false.
* Otherwise this method returns `true`.
*/
suspend fun <V : Any> ReceiveChannel<V>.transferOneIfBuffered(target: FlowCollector<V>): Boolean {
val element = poll()
return if (element == null) {
false
} else {
target.emit(element)
true
}
}
/**
* Transfer one element from this `ReceiveChannel` into the given `FlowCollector`.
* If this channel has failed, this method will throw the causing exception.
* If this channel is closed, this method will return `false`.
* If an element was successfully transferred, this method will return `true`.
*/
@ExperimentalCoroutinesApi
suspend fun <V : Any> ReceiveChannel<V>.transferOneSuspend(target: FlowCollector<V>): Boolean {
val element = receiveOrNull()
return if (element == null) {
// we are done
false
} else {
target.emit(element)
true
}
}
Dominaezzz
01/07/2020, 11:29 AM