Is there anything that is equivalent to an RxJava ...
# coroutines
s
Is there anything that is equivalent to an RxJava
Subject
with kotlinx.corotuines? There are too many edge cases with
Channel
where I risk losing events, and having to deal with them all the time is exhausting.
a
The closest thing is probably BroadcastChannel
how are you losing events? I'm not sure if broadcast channels will solve anything as they just allow you to subscribe more than once
s
Sorry, when I say
Channel
, I mean
BroadcastChannel
.
I just responded to a comment on github issues where I describe losing events with BroadcastChannels: https://github.com/Kotlin/kotlinx.coroutines/issues/736#issuecomment-460743436
s
I saw your examples. What if you call
launch { channel.send(item) }
instead of
channel.offer(item)
? Would that remember the last value sent?
s
Because someone on my team is going to forget and instead call offer. Is that the best way to ensure all events are received?
s
Yep,
offer
is not guaranteed to deliver the message (it will return
false
if the message wasn’t delivered)
s
is
send
guaranteed to deliver the message on all forms of
BroadcastChannel
?
s
Yep, but it could suspend until there is room in the channel.
(of course, if the channel is closed, then the message doesn’t get sent)
s
right
s
Also, using
launch
will guarantee that the
send
call happens in the Dispatcher associated with the CoroutineContext of the launch’s scope
s
actually, I think I found that for ConflatedBroadcastChannel, even with
send
, we would only receive the most recent event, and would still lose events
j
I was just reading the comment on the github issues. Would creating a non-broadcast channel with unlimited capacity, and a dedicated co-routine dispatching events on this channel to a broadcastchannel fix your issue? It's a bit hackish, but your producer would never block, and your consumers can be as slow as they need to be
And you wouldn't lose events until an OOM
s
Thanks for the suggestion. I considered something similar, but I'd have to route events through another thread before they'd be queued onto the proper Dispatcher
s
Yep, with a ConflatedChannel, only the latest event is remembered. Use a Rendevous or a channel with a (unlimited) buffer if you want all events to be received
If you are already in a coroutine (or a suspend func), you can just call
send
without wrapping it in a
launch
.
s
The problem with non-broadcast channel is that it's very weird only being allowed to have a single listener
s
True… For some things, Rx like functionality is much better suited.
a
If you need to handle events before a route, you can always pass info between multiple channels
s
I love the readability coroutines offer, it's in the details though that I find that I miss Rx's consistency.
a
If what you need is just thread switching, I find channels to be sufficient. Rx was nicer for things like timeouts and debouncing (which you can still do with channels through your own implementations), though I've probably never used Rx at its full potential
s
You could write your own
Channel
subclass, to which other (regular) channels can subscribe. Using
select
on that channel, you could receive incoming events and call
send
on all subscribed channels.
(btw: Debounce/throttle is not that hard to do with a Channel, I wrote one)
a
It's not hard but it's just not there by default. And if you wrote it with
produce
, the api is going to be obsolete with changes
s
FYI, I swapped broadcast channel to rondezvous channel, and I still lose events with offer. I guess we should just ensure we use send all the time.
1
I wish
offer
was renamed to something with
unsafe
in the name though.
s
With
offer
you may get into race-conditions, depending on the calling thread (at least it will return
false
if it failed). To ensure, use
send
.
☝️ 1
offer
is kinda like
onNext
for Rx. You have to serialize calls to it, otherwise your Subject/Publisher may fail.
a
Offer: to propose or put forward for consideration It seems to fit the definition well.
☝️ 2
s
Only use
offer
if you don’t mind losing an event here and there…. Or monitor the return value of
offer
and implement a sort-of-a retry.
a
Not sure what your code is, but also make sure you open a subscription to the broadcast channel outside of the launch used to receive events. If you open in a separate launch it might occur after you've sent your first event
s
I'm running into an issue with not using
ConflatedBroadcastChannel
.
private val broadcastChannel: BroadcastChannel<T> by lazy { ConflatedBroadcastChannel(performGet()) }
If I swap the above to an
ArrayBroadcastChannel
, I have no way to always emit the current value to a new subscriber.
Do you guys know of a way?
a
It may help to read the docs for conflated, array based, and unlimited channels. In most cases, like PublishSubject I believe, subscribing will only yield you responses after your subscription
s
exactly
There is no BehaviorSubject equivalent, and I don't see a way to fake it that I can find either
g
Isn't ConflatedBroadcastChannel is equivalent to BehaviorSubject? What kind issue do you have?
a
I don't think they are exactly equivalent http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html I'm assuming behaviour subject will emit duplicates. But they both will emit the message before subscription if it exists
g
Will emit duplicates? What do you mean?
Behavior Subject is like observable property, has 0 or 1 value and always emit current value to any new subscriber and future updates
This is exactly what ConflatedBroadcastChannel does
And this is what marble diagram shows in Behavior Subject documentation
a
Looking at the docs I realize I'm wrong
g
But they both will emit the message before subscription if it exists
They emiy nothing if there are no subscribers, just save value
s
@gildor If I call
Copy code
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.onNext(3)
behaviorSubject.onNext(4)
I'm guaranteed to receive all 4 events to all subscribers. If I call
Copy code
conflatedBroadcastChannel.send(1)
conflatedBroadcastChannel.send(2)
conflatedBroadcastChannel.send(3)
conflatedBroadcastChannel.send(4)
I'm only guaranteed to receive the 4th event on my subscribers. It's very likely that with the above code, I'll actually only receive the 1st and 4th event, and the 2nd and 3rd event will be lost. In fact, that's what I'm seeing consistently in my code running only on my Android main thread.
s
@spierce7 You are not guaranteed to receive all 4 events sent on your
behaviorSubject
. That largely depends on your observer’s/subscriber’s scheduler (
observeOn(scheduler)
) whether you’ll loose some or not.
g
I’m guaranteed to receive all 4 events to all subscribers.
@spierce7 It’s not true, exactly what Anton said, it depends also on your schedulers and back pressure strategy. there is no way to guarantee that with BehaviorSubject you get all the updates
s
@gildor
BehaviorSubject
has no backpressure strategy. They don't allow for backpressure. They guarantee all events. I think you are thinking of Publisher, as that is the backpressure enabled form of Subject.
g
@spierce7 BehaviorSubject doesn’t have, but you can do:
subject.toFlowable(SOME_BACKPRESSURE)
also just
subject.hide()
, which convert to observable, no backpressure, but also no warranty that you will receive all the events
They guarantee all events
Is there anything in docs about this? If so, why do you need all the events for BehaviorSubject, what is use case of that?
s
@gildor then you are transferring it to a Flowable, which is a backpressure supported item. And yes, when backpressure isn't involved, you always receive all events.
I'm positive.
This is why the documentation warns against using Observables when you have a LOT of events:
Copy code
You have a flow of no more than 1000 elements at its longest: i.e., you have so few elements over time that there is practically no chance for OOME in your application.
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#observable-and-flowable
g
Yes, but BehaviorSubject is not even Observable, this is Subject
s
BehaviorSubject inherits from Observable I believe. Anyways, I'm right or wrong about BehaviorSubject is irrelevant. I need a Channel that doesn't lose events but still emits the current value to a subscriber
g
Yes, but it has own implementation
So you need ArrayBroadcastSubject, but which emits also latest item on subscription? Probably easier just implement it yourself