https://kotlinlang.org logo
#rx
Title
# rx
g

Gabe Kauffman

11/18/2020, 11:45 PM
Maybe I shouldn't even be using PublishSubject here, not sure
g

Guillermo Alcantara

11/18/2020, 11:47 PM
Copy code
what are you trying to do? Can't you do Single.fromCallable {  <code> }.subscribeOn(Schedulers.computation())?

(Sorry for the markdown)
g

Gabe Kauffman

11/18/2020, 11:48 PM
It's basically a volume indicator. I have a very fast stream of raw audio data in the form of bytes, I do a CPU intensive calculation to turn those bytes into a Double of the volume level, and then I adjust the UI accordingly
I have this one callback
renderSample()
which is providing me with raw audio data 1000s of times per second
If your suggestion is to put
Single.fromCallable {  <code> }.subscribeOn(Schedulers.computation())
in the callback, that won't work AFAIK since I need to define the observable outside of the callback and then only pass data through the observable in the callback
g

Guillermo Alcantara

11/18/2020, 11:54 PM
check for something like: Flowable.range(0, N) .map { renderSample() } .subscribeOn(Schedulers.computation())
Sounds like you might need some back pressure handling
g

Gabe Kauffman

11/18/2020, 11:55 PM
I can't call renderSample() on my own, it's a callback from a third party library. I do turn it into a flowable later on for the backpressure handling
All I can do is define what is to be done when the library sends samples through renderSample()
g

Guillermo Alcantara

11/18/2020, 11:57 PM
I'm confused then. You can enqueue things to a subject. Then whatever subscribes, that's where you subscribeOn(computation).
g

Gabe Kauffman

11/18/2020, 11:59 PM
If subscribingOn(computation) downstream would affect which thread the computation is done here then that would work
s

StavFX

11/19/2020, 6:24 PM
In the current form of the code, there’s no use of observe/subscribeOn that will make
calculateAudioVolume
run on a different thread than the one
renderSample
is being called on. What you can do is have a subject of ByteBuffer instead of Double then you can handle
audioSample
on another thread.
g

Gabe Kauffman

11/19/2020, 9:02 PM
Ahhhhhhhh thank you so much. That feels like a step in the right direction. And then I can do something like
Copy code
val observable = publishSubject.hide().observeOn(Schedulers.computation())
                .flatMap { Observable.just(AudioProcessingHelper.calculateAudioVolume(it)) }
s

StavFX

11/19/2020, 10:10 PM
yup. I would get rid of
hide()
, since the other operators will do that for you anyway. And anytime you see
.flatMap { Observable.just(foo) }
, you can use
.map { foo }
instead. Also now you’ll have to pay attention to whether or not you can actually still trust the content of ByteBuffer to be usable after renderSample has completed, or does the SDK recycle it, making it not-thread-safe.
g

Gabe Kauffman

11/19/2020, 10:44 PM
Brilliant, thank you thank you thank you
14 Views