https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

spierce7

02/05/2019, 6:14 PM
Is there anything that is equivalent to an RxJava
Subject
with kotlinx.corotuines? There are too many edge cases with
Channel
where I risk losing events, and having to deal with them all the time is exhausting.
a

Allan Wang

02/05/2019, 6:14 PM
The closest thing is probably BroadcastChannel
how are you losing events? I'm not sure if broadcast channels will solve anything as they just allow you to subscribe more than once
s

spierce7

02/05/2019, 6:15 PM
Sorry, when I say
Channel
, I mean
BroadcastChannel
.
I just responded to a comment on github issues where I describe losing events with BroadcastChannels: https://github.com/Kotlin/kotlinx.coroutines/issues/736#issuecomment-460743436
s

streetsofboston

02/05/2019, 6:22 PM
I saw your examples. What if you call
launch { channel.send(item) }
instead of
channel.offer(item)
? Would that remember the last value sent?
s

spierce7

02/05/2019, 6:23 PM
Because someone on my team is going to forget and instead call offer. Is that the best way to ensure all events are received?
s

streetsofboston

02/05/2019, 6:25 PM
Yep,
offer
is not guaranteed to deliver the message (it will return
false
if the message wasn’t delivered)
s

spierce7

02/05/2019, 6:25 PM
is
send
guaranteed to deliver the message on all forms of
BroadcastChannel
?
s

streetsofboston

02/05/2019, 6:25 PM
Yep, but it could suspend until there is room in the channel.
(of course, if the channel is closed, then the message doesn’t get sent)
s

spierce7

02/05/2019, 6:26 PM
right
s

streetsofboston

02/05/2019, 6:27 PM
Also, using
launch
will guarantee that the
send
call happens in the Dispatcher associated with the CoroutineContext of the launch’s scope
s

spierce7

02/05/2019, 6:27 PM
actually, I think I found that for ConflatedBroadcastChannel, even with
send
, we would only receive the most recent event, and would still lose events
j

Joris PZ

02/05/2019, 6:27 PM
I was just reading the comment on the github issues. Would creating a non-broadcast channel with unlimited capacity, and a dedicated co-routine dispatching events on this channel to a broadcastchannel fix your issue? It's a bit hackish, but your producer would never block, and your consumers can be as slow as they need to be
And you wouldn't lose events until an OOM
s

spierce7

02/05/2019, 6:29 PM
Thanks for the suggestion. I considered something similar, but I'd have to route events through another thread before they'd be queued onto the proper Dispatcher
s

streetsofboston

02/05/2019, 6:29 PM
Yep, with a ConflatedChannel, only the latest event is remembered. Use a Rendevous or a channel with a (unlimited) buffer if you want all events to be received
If you are already in a coroutine (or a suspend func), you can just call
send
without wrapping it in a
launch
.
s

spierce7

02/05/2019, 6:31 PM
The problem with non-broadcast channel is that it's very weird only being allowed to have a single listener
s

streetsofboston

02/05/2019, 6:32 PM
True… For some things, Rx like functionality is much better suited.
a

Allan Wang

02/05/2019, 6:32 PM
If you need to handle events before a route, you can always pass info between multiple channels
s

spierce7

02/05/2019, 6:32 PM
I love the readability coroutines offer, it's in the details though that I find that I miss Rx's consistency.
a

Allan Wang

02/05/2019, 6:33 PM
If what you need is just thread switching, I find channels to be sufficient. Rx was nicer for things like timeouts and debouncing (which you can still do with channels through your own implementations), though I've probably never used Rx at its full potential
s

streetsofboston

02/05/2019, 6:34 PM
You could write your own
Channel
subclass, to which other (regular) channels can subscribe. Using
select
on that channel, you could receive incoming events and call
send
on all subscribed channels.
(btw: Debounce/throttle is not that hard to do with a Channel, I wrote one)
a

Allan Wang

02/05/2019, 6:35 PM
It's not hard but it's just not there by default. And if you wrote it with
produce
, the api is going to be obsolete with changes
s

spierce7

02/05/2019, 6:36 PM
FYI, I swapped broadcast channel to rondezvous channel, and I still lose events with offer. I guess we should just ensure we use send all the time.
1
I wish
offer
was renamed to something with
unsafe
in the name though.
s

streetsofboston

02/05/2019, 6:37 PM
With
offer
you may get into race-conditions, depending on the calling thread (at least it will return
false
if it failed). To ensure, use
send
.
☝️ 1
offer
is kinda like
onNext
for Rx. You have to serialize calls to it, otherwise your Subject/Publisher may fail.
a

Allan Wang

02/05/2019, 6:38 PM
Offer: to propose or put forward for consideration It seems to fit the definition well.
☝️ 2
s

streetsofboston

02/05/2019, 6:40 PM
Only use
offer
if you don’t mind losing an event here and there…. Or monitor the return value of
offer
and implement a sort-of-a retry.
a

Allan Wang

02/05/2019, 6:41 PM
Not sure what your code is, but also make sure you open a subscription to the broadcast channel outside of the launch used to receive events. If you open in a separate launch it might occur after you've sent your first event
s

spierce7

02/05/2019, 6:59 PM
I'm running into an issue with not using
ConflatedBroadcastChannel
.
private val broadcastChannel: BroadcastChannel<T> by lazy { ConflatedBroadcastChannel(performGet()) }
If I swap the above to an
ArrayBroadcastChannel
, I have no way to always emit the current value to a new subscriber.
Do you guys know of a way?
a

Allan Wang

02/05/2019, 7:01 PM
It may help to read the docs for conflated, array based, and unlimited channels. In most cases, like PublishSubject I believe, subscribing will only yield you responses after your subscription
s

spierce7

02/05/2019, 7:09 PM
exactly
There is no BehaviorSubject equivalent, and I don't see a way to fake it that I can find either
g

gildor

02/06/2019, 12:58 AM
Isn't ConflatedBroadcastChannel is equivalent to BehaviorSubject? What kind issue do you have?
a

Allan Wang

02/06/2019, 1:38 AM
I don't think they are exactly equivalent http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html I'm assuming behaviour subject will emit duplicates. But they both will emit the message before subscription if it exists
g

gildor

02/06/2019, 2:00 AM
Will emit duplicates? What do you mean?
Behavior Subject is like observable property, has 0 or 1 value and always emit current value to any new subscriber and future updates
This is exactly what ConflatedBroadcastChannel does
And this is what marble diagram shows in Behavior Subject documentation
a

Allan Wang

02/06/2019, 2:03 AM
Looking at the docs I realize I'm wrong
g

gildor

02/06/2019, 2:29 AM
But they both will emit the message before subscription if it exists
They emiy nothing if there are no subscribers, just save value
s

spierce7

02/06/2019, 9:10 PM
@gildor If I call
Copy code
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.onNext(3)
behaviorSubject.onNext(4)
I'm guaranteed to receive all 4 events to all subscribers. If I call
Copy code
conflatedBroadcastChannel.send(1)
conflatedBroadcastChannel.send(2)
conflatedBroadcastChannel.send(3)
conflatedBroadcastChannel.send(4)
I'm only guaranteed to receive the 4th event on my subscribers. It's very likely that with the above code, I'll actually only receive the 1st and 4th event, and the 2nd and 3rd event will be lost. In fact, that's what I'm seeing consistently in my code running only on my Android main thread.
s

streetsofboston

02/06/2019, 9:13 PM
@spierce7 You are not guaranteed to receive all 4 events sent on your
behaviorSubject
. That largely depends on your observer’s/subscriber’s scheduler (
observeOn(scheduler)
) whether you’ll loose some or not.
g

gildor

02/07/2019, 6:17 AM
I’m guaranteed to receive all 4 events to all subscribers.
@spierce7 It’s not true, exactly what Anton said, it depends also on your schedulers and back pressure strategy. there is no way to guarantee that with BehaviorSubject you get all the updates
s

spierce7

02/07/2019, 7:09 AM
@gildor
BehaviorSubject
has no backpressure strategy. They don't allow for backpressure. They guarantee all events. I think you are thinking of Publisher, as that is the backpressure enabled form of Subject.
g

gildor

02/07/2019, 7:09 AM
@spierce7 BehaviorSubject doesn’t have, but you can do:
subject.toFlowable(SOME_BACKPRESSURE)
also just
subject.hide()
, which convert to observable, no backpressure, but also no warranty that you will receive all the events
They guarantee all events
Is there anything in docs about this? If so, why do you need all the events for BehaviorSubject, what is use case of that?
s

spierce7

02/07/2019, 7:13 AM
@gildor then you are transferring it to a Flowable, which is a backpressure supported item. And yes, when backpressure isn't involved, you always receive all events.
I'm positive.
This is why the documentation warns against using Observables when you have a LOT of events:
Copy code
You have a flow of no more than 1000 elements at its longest: i.e., you have so few elements over time that there is practically no chance for OOME in your application.
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#observable-and-flowable
g

gildor

02/07/2019, 7:17 AM
Yes, but BehaviorSubject is not even Observable, this is Subject
s

spierce7

02/07/2019, 8:36 AM
BehaviorSubject inherits from Observable I believe. Anyways, I'm right or wrong about BehaviorSubject is irrelevant. I need a Channel that doesn't lose events but still emits the current value to a subscriber
g

gildor

02/07/2019, 8:42 AM
Yes, but it has own implementation
So you need ArrayBroadcastSubject, but which emits also latest item on subscription? Probably easier just implement it yourself
3 Views