I looked at how the adapters for RxJava work and.....
# coroutines
d
I looked at how the adapters for RxJava work and... hooray they just ignore the backpressure problem. Even for
Flow.asObservable
it just dumps the flow into the emitter as fast it can, without regard for the backpressure mechanisms 😞 (https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt#L80-L107). The opposite (convert Observable into ReceiveChannel) does the same: https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt#L65-L96
I could at least use the 2nd example (
SubscriptionChannel
) as a basis, but even that I can't do, because there are no channel implementations in the coroutines library that are subclassable (it's all internal). You have to always implement everything from scratch... is this by design? If so, why?
a
d
observable doesn't do backpressure
Huh? https://github.com/ReactiveX/RxJava/wiki/Backpressure
https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt might be a more useful example
A
Flow
is not what I want. The context is gRPC, so a remote procedure call. It has to be predictable when and how often it happens. I don't want the call to happen twice if you accidentally consume the flow twice. That is why I specifically want a
Channel
, which can only be consumed once.
In fact the RxJava backpressure is the perfect example, because it works exactly like in gRPC-Java.
z
The Rx/Flow adapters definitely honor backpressure, but they only do for
Flowable
because
Observable
doesn’t have backpressure. But you’re talking about sharing, which is a separate issue from backpressure?
d
How does
Observable
in RxJava not have backpressure? I am not a user of RxJava, but reading the above link it seems it very much has (by calling
request
if you want more items).
And yes, I also know sharing is a separate issue.
z
Are you using RxJava 1 or 2? In Rx1,
Observable
does have backpressure, but in Rx2 there are two parallel types,
Flowable
and
Observable
, and the only difference is
Flowable
has backpressure and
Observable
does not.
d
I am not using either. I was trying to find examples for how to adapt a callback-based backpressure API to coroutines. And RxJava seemd like a good place to look.
z
it’s much more complex than the
Observable
one, but you’ll notice request tracking and stuff
(note that
Publisher
is the generic “reactive streams” version of RxJava’s
Flowable
, and
Flowable
actually implements the
Publisher
interface)
d
z
yep!
d
thank you very much for your help
Now on to bidirectional streaming. I already regret everything,
z
I’m not sure if you’d find this helpful, but Wire recently got a Kotlin API for gRPC, might want to take a look at that for ideas: https://square.github.io/wire/wire_grpc/
d
First time I hear of it 🙂 I am working on a protobuf plugin that generates extensions on the usual grpc-java generated stubs. But I might be able to use Wire instead 😄
z
If you have questions, check out the #squarelibraries channel
👍 1