Is there a significant difference in these two imp...
# getting-started
h
Is there a significant difference in these two implementations in terms of performance and also is this even a good way of doing it? The purpose is that the service layer just signals the change of the specified type and elsewhere the it's implemented how the change is handled for each different type, because some changes have many cascading MQTT changes and some have very little. Single
PublishSubject
(older):
Copy code
object DatabaseObserver {

    val logger = LoggerFactory.getLogger(this.javaClass.simpleName)
    val trigger = PublishSubject.create<RepoUpdate<*>>()

    @Suppress("UNCHECKED_CAST")
    inline fun <reified T : Any> getRepoObserver(): Flowable<RepoUpdate<T>> =
        trigger
            .toFlowable(BackpressureStrategy.LATEST)
            .filter {
                // Ensure type is of requested type
                (it.new is T || it.new == null) &&
                (it.old is T || it.old == null)
            }
            .filter {
                // We don't need updates without changes.
                (it.type == UpdateType.UPDATE) &&
                (it.old != it.new)
            }
            .map { it as RepoUpdate<T> }
            .distinctUntilChanged()
            .doOnError { logger.error("RepoUpdate Error:", it) }
}
Multiple(newer):
Copy code
object MqttParity {

    val logger = KotlinLogging.logger { }
    val triggerMap = mutableMapOf<KType, PublishSubject<MqttUpdate<*>>>()

    inline fun <reified T : Any>publish(update: MqttUpdate<T>) {
        triggerMap
            .getOrPut(typeOf<T>()) { PublishSubject.create() }
            .onNext(update)
    }

    @Suppress("UNCHECKED_CAST")
    inline fun <reified T : Any> updater(): Flowable<MqttUpdate<T>> {
        return triggerMap
            .getOrPut(typeOf<T>()) { PublishSubject.create() }
            .toFlowable(BackpressureStrategy.LATEST)
            .filter {
                // We don't need updates without changes.
                when(it) {
                    is MqttUpdate.Update -> it.old != it.new
                    else -> true
                }
            }
            .map { it as MqttUpdate<T> }
            .distinctUntilChanged()
            .doOnError { logger.error("MqttUpdate Error on Trigger ${typeOf<T>()}:", it) }
    }
}