Hi guys! We have this weird scenario where we have...
# rx
g
Hi guys! We have this weird scenario where we have to subscribe to one stream (let’s call it “notification stream”), then - before any item appears - trigger another completable, which immediately returns empty response but - as a side effect - starts emitting on the “notification” stream. And we simply want to return this “notification” stream from our function... Is there a nice RX way of doing this?
k
I am not exactly sure about the request. But I try my best and help me out if something not the same as you would expect! Here the side effect will emit events downstream
Copy code
Observable.merge(
   sideEffectObservable,
   notification
 )
Or if you want the SideEffect to trigger and finish before the Notifications:
Copy code
notifications.startWith(sideEffectObservable)
g
Hi. I probably didn’t describe it correctly 🙂 I’ll try again…
We need something like this:
1. Subscribe to notifications 2. Send START completable (which activates notifications) 3. Return notifications stream
So, in other words, we only need the notification stream, but we need to send this START comand/completable to activate it.
k
startWith
helps you with that,
sideEffectObservable
will be executed first triggering the notification and than continue listening to the notification events.
a
I was thinking
Copy code
notifications.mergeWith(trigger)
for example
Copy code
val notifications = PublishRelay.create<Int>()

val trigger = Completable.fromAction {
    thread {
        for (i in 1..10) {
            notifications.accept(i)
            Thread.sleep(1000)
        }
    }
}

val latch = CountDownLatch(1)

notifications.mergeWith(trigger)
        .take(5)
        .subscribeBy(
                onNext = { println(it) },
                onComplete = { latch.countDown() }
        )

latch.await()
g
Yeah, we’ve experimented with merge, but… The thing with
startWith
and
merge
is that both streams have to be of the same type, which is not the case here. Also, even if they were, we don’t want the emission of START being present in the returned stream,
a
mergeWith
takes a
Completable
as seen above
g
Right, sorry for that confusion... I got used to Rx2.0 naming. But in this case it’s actually an Observable that emits a single item. It’s a really bad API in one of our libraries 😕
a
So
notifications.mergeWith(trigger.ignoreElements())
then?
👍 1
k
you can silence any stream emmision with a
Copy code
flatmap{ Observable.empty<MyType>() }
g
Hmm, interesting. Seems a bit hacky to deliberately silence a stream, but ok :) Thanks guys! I appreciate your help.
a
I thought you’re not interested in that element, and just need to subscribe to that stream? In that case I’m not sure what else is there, other than ignoring the elements. What should be done with the elements for it not to seem hacky? 🤔