Maybe I shouldn't even be using PublishSubject her...
# rx
g
Maybe I shouldn't even be using PublishSubject here, not sure
g
Copy code
what are you trying to do? Can't you do Single.fromCallable {  <code> }.subscribeOn(Schedulers.computation())?

(Sorry for the markdown)
g
It's basically a volume indicator. I have a very fast stream of raw audio data in the form of bytes, I do a CPU intensive calculation to turn those bytes into a Double of the volume level, and then I adjust the UI accordingly
I have this one callback
renderSample()
which is providing me with raw audio data 1000s of times per second
If your suggestion is to put
Single.fromCallable {  <code> }.subscribeOn(Schedulers.computation())
in the callback, that won't work AFAIK since I need to define the observable outside of the callback and then only pass data through the observable in the callback
g
check for something like: Flowable.range(0, N) .map { renderSample() } .subscribeOn(Schedulers.computation())
Sounds like you might need some back pressure handling
g
I can't call renderSample() on my own, it's a callback from a third party library. I do turn it into a flowable later on for the backpressure handling
All I can do is define what is to be done when the library sends samples through renderSample()
g
I'm confused then. You can enqueue things to a subject. Then whatever subscribes, that's where you subscribeOn(computation).
g
If subscribingOn(computation) downstream would affect which thread the computation is done here then that would work
s
In the current form of the code, there’s no use of observe/subscribeOn that will make
calculateAudioVolume
run on a different thread than the one
renderSample
is being called on. What you can do is have a subject of ByteBuffer instead of Double then you can handle
audioSample
on another thread.
g
Ahhhhhhhh thank you so much. That feels like a step in the right direction. And then I can do something like
Copy code
val observable = publishSubject.hide().observeOn(Schedulers.computation())
                .flatMap { Observable.just(AudioProcessingHelper.calculateAudioVolume(it)) }
s
yup. I would get rid of
hide()
, since the other operators will do that for you anyway. And anytime you see
.flatMap { Observable.just(foo) }
, you can use
.map { foo }
instead. Also now you’ll have to pay attention to whether or not you can actually still trust the content of ByteBuffer to be usable after renderSample has completed, or does the SDK recycle it, making it not-thread-safe.
g
Brilliant, thank you thank you thank you