Hi! I have a problem. I have Observable to observe...
# reaktive
b
Hi! I have a problem. I have Observable to observe changes in database:
Copy code
override fun observeAll(): Observable<List<RosterPresenceItem>> = queries.query(RosterDatabaseQueries::loadAll)
		.observe {
			it.executeAsList().map { e -> … }
		}
Sometimes I need to push new content from database to all subscribers, when something will be published in
Copy code
val updateSubject = PublishSubject<Unit>()
I thought that
repeatWhen()
should be used for that, but I don't know how to use it. Any hints?
a
Is it correct that you want to requery the database when updateSubject emits an item, in addition to the already observed database query?
b
Yes! This is exactly what I need!
Maybe this is my design fault, but in component to display and manage roster I need to show information about presence, so in my case database query gets data from few tables (including one managed by different component).
a
Got it. Something like this should work I guess, from the top of my head:
Copy code
override fun observeAll(): Observable<List<RosterPresenceItem>> = 
    merge(
        queries
            .query(RosterDatabaseQueries::loadAll)
            .observe {
                it.executeAsList().map { e -> … }
            },
        updateSubject.flatMapSingle {
            queries
                .query(RosterDatabaseQueries::loadAll)
                .single {
                    it.executeAsList().map { e -> … }
                }
        }
)
b
Thanks. I'll try it. A moment ago I created function
Copy code
fun <T> Observable<T>.powtorzGdy(observable: Observable<*>): Observable<T> = observable { emitter ->
	var listener: Disposable? = this@powtorzGdy.subscribe(
		onNext = emitter::onNext, onError = emitter::onError, onComplete = emitter::onComplete
	)

	observable.subscribe {
		listener?.dispose()
		listener = this@powtorzGdy.subscribe(
			onNext = emitter::onNext, onError = emitter::onError, onComplete = emitter::onComplete
		)
	}
}
and it works, but I have no idea if it is done correctly 🙂
a
Great idea! You can actually use
BehaviorSubject
+
switchMap
:
Copy code
private val updateSubject = BehaviorSubject(Unit)

    override fun observeAll(): Observable<List<RosterPresenceItem>> = 
        updateSubject.switchMap {
            queries
                .query(RosterDatabaseQueries::loadAll)
                .observe {
                    it.executeAsList().map { e -> … }
                }
        }
Here
switchMap
replaces your
powtorzGdy
function.
b
Thanks!