dimsuz
11/13/2021, 12:44 PMscan
operator:
Observable.just(1)
.doOnSubscribe { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.blockingSubscribe { println("got $it") }
println("===")
flowOf(1)
.onStart { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.collect { println("got $it") }
results and question in the thread →Hello
got 0
got 1
===
got 0
Hello
got 1
scan
operator I can see why it does this, but do you have any ideas how should I bring this on-par?
Write a custom scan operator?Joffrey
11/13/2021, 12:48 PMonStart
after scan
?dimsuz
11/13/2021, 12:49 PMflowOf().onStart()
part in my case comes from the library user. Above is a simplified example.
I have something more like:
merge(userFlow1, userFlow2).scan(...)
I don't control those flows. What I need is to make sure that scan
emits it's first item after flows are subscribed to.Joffrey
11/13/2021, 12:52 PMscan
theoretically doesn't need to subscribe to the upstream flow before emitting its initial elementonStart
for in this case? And why do you need scan
to behave this way?dimsuz
11/13/2021, 12:56 PMsubscribe
will chain-trigger doOnSubscribe upstream. Evem of no item is emmitted. This doesn't happen in this case and in the workaround you've suggested.It'd be interesting to know your use case for this.Basically I'm implementing a state machine where
scan
is folding "state" + "side effect" pairs: scan(initialState to { /* lambda */ }, ...)
which then get rendered and executed respectively. And due to the way this whole "scan"-containing chain is set up, it is important that upstream producers have their subscribe side effects started before any emission starts, as generally is expected and works for Rx.ephemient
11/13/2021, 10:28 PMdimsuz
11/13/2021, 11:02 PMmerge(
flowOf(1).onStart { println("one") },
flowOf(2).onStart { println("two") }
)
.scan(0, { a, v -> a + v})
.collect { println("got $it")}
prints "got 0" and only after that prints "one","two".
As an API user I wouldn't expect that.
And the bug in my case happens because subscription to the flows happens after first item emission, so they see a wrong state during initialization.
Oh well, thought about this for all day, so far I can't find an elegant solution...ephemient
11/13/2021, 11:59 PMval upstream = flowOf(2).onStart { emit(1) }
val scan = upstream.onStart { emit(0) }
scan.collect() // [0, 1, 2]
dimsuz
11/14/2021, 1:04 AM