https://kotlinlang.org logo
Title
d

diesieben07

01/09/2020, 9:43 AM
I looked at how the adapters for RxJava work and... hooray they just ignore the backpressure problem. Even for
Flow.asObservable
it just dumps the flow into the emitter as fast it can, without regard for the backpressure mechanisms 😞 (https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt#L80-L107). The opposite (convert Observable into ReceiveChannel) does the same: https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt#L65-L96
I could at least use the 2nd example (
SubscriptionChannel
) as a basis, but even that I can't do, because there are no channel implementations in the coroutines library that are subclassable (it's all internal). You have to always implement everything from scratch... is this by design? If so, why?
a

Adam Powell

01/09/2020, 9:38 PM
d

diesieben07

01/09/2020, 9:43 PM
observable doesn't do backpressure
Huh? https://github.com/ReactiveX/RxJava/wiki/Backpressure
https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt might be a more useful example
A
Flow
is not what I want. The context is gRPC, so a remote procedure call. It has to be predictable when and how often it happens. I don't want the call to happen twice if you accidentally consume the flow twice. That is why I specifically want a
Channel
, which can only be consumed once.
In fact the RxJava backpressure is the perfect example, because it works exactly like in gRPC-Java.
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:34 PM
The Rx/Flow adapters definitely honor backpressure, but they only do for
Flowable
because
Observable
doesn’t have backpressure. But you’re talking about sharing, which is a separate issue from backpressure?
d

diesieben07

01/09/2020, 10:35 PM
How does
Observable
in RxJava not have backpressure? I am not a user of RxJava, but reading the above link it seems it very much has (by calling
request
if you want more items).
And yes, I also know sharing is a separate issue.
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:37 PM
Are you using RxJava 1 or 2? In Rx1,
Observable
does have backpressure, but in Rx2 there are two parallel types,
Flowable
and
Observable
, and the only difference is
Flowable
has backpressure and
Observable
does not.
d

diesieben07

01/09/2020, 10:38 PM
I am not using either. I was trying to find examples for how to adapt a callback-based backpressure API to coroutines. And RxJava seemd like a good place to look.
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:39 PM
it’s much more complex than the
Observable
one, but you’ll notice request tracking and stuff
(note that
Publisher
is the generic “reactive streams” version of RxJava’s
Flowable
, and
Flowable
actually implements the
Publisher
interface)
d

diesieben07

01/09/2020, 10:41 PM
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:42 PM
yep!
d

diesieben07

01/09/2020, 10:42 PM
thank you very much for your help
Now on to bidirectional streaming. I already regret everything,
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:44 PM
I’m not sure if you’d find this helpful, but Wire recently got a Kotlin API for gRPC, might want to take a look at that for ideas: https://square.github.io/wire/wire_grpc/
d

diesieben07

01/09/2020, 10:47 PM
First time I hear of it 🙂 I am working on a protobuf plugin that generates extensions on the usual grpc-java generated stubs. But I might be able to use Wire instead 😄
z

Zach Klippenstein (he/him) [MOD]

01/09/2020, 10:58 PM
If you have questions, check out the #squarelibraries channel
👍 1