Do I need to write a custom operator for it? Or we...
# rx
t
Do I need to write a custom operator for it? Or we already have some kind of combination to get that function?
1
d
I'm interested in a 'Reactive Coroutines' / Flow-based equivalent to the Rx
scan
function as well. It could be implemented by having a
var
share some limited scope with a
Flow.map
function of course; but like you I'd prefer something idiomatic if it exists.
@thanh Close to your topic, in RxJava it's very common to work with `Observable`s that provide an initial 'last known' value upon subscription, followed by 'hot' updates. This somewhat merges the concept of hot and cold observables and is (not so intuituively) called a
ConflatedBroadcastChannel
in Kotlin coroutines. See: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/
t
RxJava
Observable
has
scanWith(Callable<R>, BiFunction<R, T, R>)
, which is
scan
with a deferred first value.
t
@tseisel My scan function is very similar with
scanWith
, the different is I want to produce the first value from the first emitted item with
initialValueProvider
.
i
@thanh Hi! You simulate such behavior like this:
Copy code
observable
  .startWith(Observable.fromCallable({YOUR_CALLABLE))
  .scan({YOUR_ACCUMULATOR})
d
Pretty sure @thanh is looking for a coroutines equivalent answer using `Flow`/`Channel`/`ConflatedBroadcastChannel` operators. People keep responding with RxJava answers... correct if I'm wrong @thanh
t
Sorry @thanh, I didn't noticed the difference ! I wrote something like this :
Copy code
fun <T : Any, R : Any> Observable<T>.scanFirst(firstValueMapper: (T) -> R, accumulator: (R, T) -> R): Observable<R> {
    val first = this.firstElement().map(firstValueMapper)
    val rest = this.skip(1)
    return rest.scanWith({ first.blockingGet() }, accumulator)
}
Not sure it performs well regarding the Rx spec, but that's way simpler than writing a custom operator.
@darkmoon_uk, does the
<T, R> Flow<T>.scan(initial: R, operation: (accumulator: R, value: T) -> R): Flow<R>
function available in
coroutines-core
match your need ? I think it performs the same as Rx's
scan
.
d
Sure does! Thanks 😄
i
@darkmoon_uk Then it’s really strange to ask questions about coroutines in #rx channel.
🤦‍♂️ 1
t
@darkmoon_uk My question is about RxJava 😉
✔️ 1
thanks @ilyagulya and @tseisel. Based on your answers I came up with this:
Copy code
fun <T, R> Observable<T>.scan(initialValueProvider: (T) -> R, accumulator: (R, R) -> R): Observable<R> {
    return this.map(initialValueProvider).scan(accumulator)
}
It is identical with my original function but it works in my case.