```class BatteryLevelObservable(...) : ObservableO...
# rx
m
Copy code
class BatteryLevelObservable(...) : ObservableOnSubscribe<...> {
    override fun subscribe(emitter: ObservableEmitter...) {
        val listener = object : ProtocolCallbackSimple() {
            override fun onDeviceInfo(p0: BasicInfos) {
                if (!emitter.isDisposed) {
                    emitter.onNext(...)
                    emitter.onComplete()
                }
            }
        }

        emitter.setDisposable(Disposables.fromAction {
            ...
        })

        try {
            ... // trigger callback
        } catch (e: Exception) {
            if (!emitter.isDisposed) {
                emitter.onError(e)
            } else {
                RxJavaPlugins.onError(e)
            }
        }
    }
}