Hildebrandt Tobias
04/11/2024, 10:53 AMPublishSubject
(older):
object DatabaseObserver {
val logger = LoggerFactory.getLogger(this.javaClass.simpleName)
val trigger = PublishSubject.create<RepoUpdate<*>>()
@Suppress("UNCHECKED_CAST")
inline fun <reified T : Any> getRepoObserver(): Flowable<RepoUpdate<T>> =
trigger
.toFlowable(BackpressureStrategy.LATEST)
.filter {
// Ensure type is of requested type
(it.new is T || it.new == null) &&
(it.old is T || it.old == null)
}
.filter {
// We don't need updates without changes.
(it.type == UpdateType.UPDATE) &&
(it.old != it.new)
}
.map { it as RepoUpdate<T> }
.distinctUntilChanged()
.doOnError { logger.error("RepoUpdate Error:", it) }
}
Multiple(newer):
object MqttParity {
val logger = KotlinLogging.logger { }
val triggerMap = mutableMapOf<KType, PublishSubject<MqttUpdate<*>>>()
inline fun <reified T : Any>publish(update: MqttUpdate<T>) {
triggerMap
.getOrPut(typeOf<T>()) { PublishSubject.create() }
.onNext(update)
}
@Suppress("UNCHECKED_CAST")
inline fun <reified T : Any> updater(): Flowable<MqttUpdate<T>> {
return triggerMap
.getOrPut(typeOf<T>()) { PublishSubject.create() }
.toFlowable(BackpressureStrategy.LATEST)
.filter {
// We don't need updates without changes.
when(it) {
is MqttUpdate.Update -> it.old != it.new
else -> true
}
}
.map { it as MqttUpdate<T> }
.distinctUntilChanged()
.doOnError { logger.error("MqttUpdate Error on Trigger ${typeOf<T>()}:", it) }
}
}