https://kotlinlang.org logo
#rx
Title
# rx
j

Joe

07/25/2019, 10:02 PM
I currently have a slow, synchronous, io-bound method that returns a
Publisher<T>
, and am doing
buildPublisher().subscribe(subscriber)
from an "outside of rx" thread/context. To free up that thread, I'd like to move the
buildPublisher()
call "into rx". Currently have the following, but not sure if there's a clearer, less error prone, or otherwise better way:
Copy code
Flowable.create<T>({ emitter -> 
  try {
    buildPublisher().subscribe(
        object : Subscriber<T> {
            override fun onComplete() {emitter.onComplete() }
            override fun onSubscribe(s: Subscription) { s.request(Long.MAX_VALUE) }
            override fun onNext(t: T) { emitter.onNext(t) }
            override fun onError(t: Throwable) { emitter.onError(t) }
        }
  } catch (e: Exception) { emitter.onError(e) }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(<http://Schedulers.io|Schedulers.io>())
    .subscribe(subscriber)
s

Sergey Chelombitko

07/26/2019, 9:34 AM
Flowable.fromPublisher(buildPublisher())
j

Joe

07/26/2019, 2:09 PM
that's what I'm starting from, but then buildPublisher() gets called from the entry thread. came up with this instead:
Copy code
Single.fromCallable(::getPublisher)
    .flatMapPublisher { it }
    .subscribeOn(<http://Schedulers.io|Schedulers.io>())
    .subscribe(subscriber)
s

Sergey Chelombitko

07/26/2019, 3:00 PM
You can wrap that in defer to switch thread where buildPublisher would get called:
Flowable.defer { Flowable.fromPublisher(buildPublisher()) }
j

Joe

07/26/2019, 3:32 PM
cool, thanks! is there a material difference between the
defer
and the
flatMap
implementations?
hmm, one difference is that in the
flatMap
version, if the publisher returned has a
doOnFinally()
registered, and the subscription gets cancelled before the publisher returns, that never gets called. switching over to the
defer
for that reason, thanks for the suggestion!