gregd
12/11/2019, 7:55 AMkioba
12/11/2019, 8:14 AMObservable.merge(
sideEffectObservable,
notification
)
Or if you want the SideEffect to trigger and finish before the Notifications:
notifications.startWith(sideEffectObservable)
gregd
12/11/2019, 8:20 AMkioba
12/11/2019, 8:26 AMstartWith
helps you with that, sideEffectObservable
will be executed first triggering the notification and than continue listening to the notification events.arekolek
12/11/2019, 8:29 AMnotifications.mergeWith(trigger)
val notifications = PublishRelay.create<Int>()
val trigger = Completable.fromAction {
thread {
for (i in 1..10) {
notifications.accept(i)
Thread.sleep(1000)
}
}
}
val latch = CountDownLatch(1)
notifications.mergeWith(trigger)
.take(5)
.subscribeBy(
onNext = { println(it) },
onComplete = { latch.countDown() }
)
latch.await()
gregd
12/11/2019, 8:39 AMstartWith
and merge
is that both streams have to be of the same type, which is not the case here.
Also, even if they were, we don’t want the emission of START being present in the returned stream,arekolek
12/11/2019, 8:44 AMmergeWith
takes a Completable
gregd
12/11/2019, 8:48 AMarekolek
12/11/2019, 8:52 AMnotifications.mergeWith(trigger.ignoreElements())
then?kioba
12/11/2019, 8:52 AMflatmap{ Observable.empty<MyType>() }
gregd
12/11/2019, 9:04 AMarekolek
12/11/2019, 9:18 AM