https://kotlinlang.org logo
#rx
Title
# rx
t

thanh

08/27/2019, 6:40 PM
Do I need to write a custom operator for it? Or we already have some kind of combination to get that function?
1
d

darkmoon_uk

08/28/2019, 12:48 AM
I'm interested in a 'Reactive Coroutines' / Flow-based equivalent to the Rx
scan
function as well. It could be implemented by having a
var
share some limited scope with a
Flow.map
function of course; but like you I'd prefer something idiomatic if it exists.
@thanh Close to your topic, in RxJava it's very common to work with `Observable`s that provide an initial 'last known' value upon subscription, followed by 'hot' updates. This somewhat merges the concept of hot and cold observables and is (not so intuituively) called a
ConflatedBroadcastChannel
in Kotlin coroutines. See: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/
t

tseisel

08/28/2019, 6:21 AM
RxJava
Observable
has
scanWith(Callable<R>, BiFunction<R, T, R>)
, which is
scan
with a deferred first value.
t

thanh

08/28/2019, 8:02 AM
@tseisel My scan function is very similar with
scanWith
, the different is I want to produce the first value from the first emitted item with
initialValueProvider
.
i

ilyagulya

08/28/2019, 9:03 AM
@thanh Hi! You simulate such behavior like this:
Copy code
observable
  .startWith(Observable.fromCallable({YOUR_CALLABLE))
  .scan({YOUR_ACCUMULATOR})
d

darkmoon_uk

08/28/2019, 9:06 AM
Pretty sure @thanh is looking for a coroutines equivalent answer using `Flow`/`Channel`/`ConflatedBroadcastChannel` operators. People keep responding with RxJava answers... correct if I'm wrong @thanh
t

tseisel

08/28/2019, 9:16 AM
Sorry @thanh, I didn't noticed the difference ! I wrote something like this :
Copy code
fun <T : Any, R : Any> Observable<T>.scanFirst(firstValueMapper: (T) -> R, accumulator: (R, T) -> R): Observable<R> {
    val first = this.firstElement().map(firstValueMapper)
    val rest = this.skip(1)
    return rest.scanWith({ first.blockingGet() }, accumulator)
}
Not sure it performs well regarding the Rx spec, but that's way simpler than writing a custom operator.
@darkmoon_uk, does the
<T, R> Flow<T>.scan(initial: R, operation: (accumulator: R, value: T) -> R): Flow<R>
function available in
coroutines-core
match your need ? I think it performs the same as Rx's
scan
.
d

darkmoon_uk

08/28/2019, 9:21 AM
Sure does! Thanks 😄
i

ilyagulya

08/28/2019, 9:52 AM
@darkmoon_uk Then it’s really strange to ask questions about coroutines in #rx channel.
🤦‍♂️ 1
t

thanh

08/28/2019, 10:56 AM
@darkmoon_uk My question is about RxJava 😉
✔️ 1
thanks @ilyagulya and @tseisel. Based on your answers I came up with this:
Copy code
fun <T, R> Observable<T>.scan(initialValueProvider: (T) -> R, accumulator: (R, R) -> R): Observable<R> {
    return this.map(initialValueProvider).scan(accumulator)
}
It is identical with my original function but it works in my case.
3 Views