https://kotlinlang.org logo
Title
g

gregd

12/11/2019, 7:55 AM
Hi guys! We have this weird scenario where we have to subscribe to one stream (let’s call it “notification stream”), then - before any item appears - trigger another completable, which immediately returns empty response but - as a side effect - starts emitting on the “notification” stream. And we simply want to return this “notification” stream from our function... Is there a nice RX way of doing this?
k

kioba

12/11/2019, 8:14 AM
I am not exactly sure about the request. But I try my best and help me out if something not the same as you would expect! Here the side effect will emit events downstream
Observable.merge(
   sideEffectObservable,
   notification
 )
Or if you want the SideEffect to trigger and finish before the Notifications:
notifications.startWith(sideEffectObservable)
g

gregd

12/11/2019, 8:20 AM
Hi. I probably didn’t describe it correctly 🙂 I’ll try again…
We need something like this:
1. Subscribe to notifications 2. Send START completable (which activates notifications) 3. Return notifications stream
So, in other words, we only need the notification stream, but we need to send this START comand/completable to activate it.
k

kioba

12/11/2019, 8:26 AM
startWith
helps you with that,
sideEffectObservable
will be executed first triggering the notification and than continue listening to the notification events.
a

arekolek

12/11/2019, 8:29 AM
I was thinking
notifications.mergeWith(trigger)
for example
val notifications = PublishRelay.create<Int>()

val trigger = Completable.fromAction {
    thread {
        for (i in 1..10) {
            notifications.accept(i)
            Thread.sleep(1000)
        }
    }
}

val latch = CountDownLatch(1)

notifications.mergeWith(trigger)
        .take(5)
        .subscribeBy(
                onNext = { println(it) },
                onComplete = { latch.countDown() }
        )

latch.await()
g

gregd

12/11/2019, 8:39 AM
Yeah, we’ve experimented with merge, but… The thing with
startWith
and
merge
is that both streams have to be of the same type, which is not the case here. Also, even if they were, we don’t want the emission of START being present in the returned stream,
a

arekolek

12/11/2019, 8:44 AM
mergeWith
takes a
Completable
as seen above
g

gregd

12/11/2019, 8:48 AM
Right, sorry for that confusion... I got used to Rx2.0 naming. But in this case it’s actually an Observable that emits a single item. It’s a really bad API in one of our libraries 😕
a

arekolek

12/11/2019, 8:52 AM
So
notifications.mergeWith(trigger.ignoreElements())
then?
👍 1
k

kioba

12/11/2019, 8:52 AM
you can silence any stream emmision with a
flatmap{ Observable.empty<MyType>() }
g

gregd

12/11/2019, 9:04 AM
Hmm, interesting. Seems a bit hacky to deliberately silence a stream, but ok :) Thanks guys! I appreciate your help.
a

arekolek

12/11/2019, 9:18 AM
I thought you’re not interested in that element, and just need to subscribe to that stream? In that case I’m not sure what else is there, other than ignoring the elements. What should be done with the elements for it not to seem hacky? 🤔