Looking for help. I need an Rx analogue of `Flow.b...
# rx
e
Looking for help. I need an Rx analogue of
Flow.buffer
operator that lets the publisher produce faster than consumer, but puts a backpressure on it when buffer overflows. So like
onBackpressureBuffer
, but without an error on overflow. What can I do?
a
Copy code
Flowable.create(BackpressureStrategy.BUFFER)
Its not an operator though but
RxJava2
introduced a different kind of Observable for the purpose. Or a
buffer
operator with
timespan
overload might also help. http://reactivex.io/documentation/operators/buffer.html
a
@elizarov To recap, you're looking for something that buffers up until a certain buffer size, at which point it stops adding new items to the buffer and instead uses the built in backpressure support to indicate to the producer that it should stop producing items until the buffer is no longer full - is that right?
e
Yes
a
onBackPressureBuffer
is the closest I can imagine to that use case but obviously it doesn't let you signal the backpressure up to the producer. Actually, is this possible in Rx? I could be wrong but I think there are factory
Flowable
functions that just don't support listening for backpressure signals. Like, I don't think the
Flowable.interval
factory method can do anything about backpressure signals other than trigger an error...
p
You can change
BackpressureOverflowStrategy
in
onBackPressureBuffer
from error to drop lastest / oldest, but it won't fit your needs. Unless we don't express backpressure in end consumers
DisposableSubscriber
by manually requesting more data, it looks like this case is not possible in Rx as @alexsullivan114 said, but I can be wrong
k
It sounds like your consumer should just stop consuming when the consumer's buffer is full? If it's a Flowable, that should cause backpressure upstream...