dimsuz
11/13/2021, 12:44 PMscan
operator:
Observable.just(1)
.doOnSubscribe { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.blockingSubscribe { println("got $it") }
println("===")
flowOf(1)
.onStart { println("Hello") }
.scan(0) { v1, v2 -> v1 + v2 }
.collect { println("got $it") }
results and question in the thread →dimsuz
11/13/2021, 12:44 PMHello
got 0
got 1
===
got 0
Hello
got 1
dimsuz
11/13/2021, 12:46 PMscan
operator I can see why it does this, but do you have any ideas how should I bring this on-par?
Write a custom scan operator?Joffrey
11/13/2021, 12:48 PMonStart
after scan
?dimsuz
11/13/2021, 12:49 PMflowOf().onStart()
part in my case comes from the library user. Above is a simplified example.
I have something more like:
merge(userFlow1, userFlow2).scan(...)
I don't control those flows. What I need is to make sure that scan
emits it's first item after flows are subscribed to.Joffrey
11/13/2021, 12:52 PMscan
theoretically doesn't need to subscribe to the upstream flow before emitting its initial elementJoffrey
11/13/2021, 12:55 PMonStart
for in this case? And why do you need scan
to behave this way?dimsuz
11/13/2021, 12:56 PMdimsuz
11/13/2021, 12:57 PMsubscribe
will chain-trigger doOnSubscribe upstream. Evem of no item is emmitted. This doesn't happen in this case and in the workaround you've suggested.dimsuz
11/13/2021, 1:01 PMIt'd be interesting to know your use case for this.Basically I'm implementing a state machine where
scan
is folding "state" + "side effect" pairs: scan(initialState to { /* lambda */ }, ...)
which then get rendered and executed respectively. And due to the way this whole "scan"-containing chain is set up, it is important that upstream producers have their subscribe side effects started before any emission starts, as generally is expected and works for Rx.ephemient
11/13/2021, 10:28 PMdimsuz
11/13/2021, 11:02 PMmerge(
flowOf(1).onStart { println("one") },
flowOf(2).onStart { println("two") }
)
.scan(0, { a, v -> a + v})
.collect { println("got $it")}
prints "got 0" and only after that prints "one","two".
As an API user I wouldn't expect that.
And the bug in my case happens because subscription to the flows happens after first item emission, so they see a wrong state during initialization.
Oh well, thought about this for all day, so far I can't find an elegant solution...ephemient
11/13/2021, 11:59 PMval upstream = flowOf(2).onStart { emit(1) }
val scan = upstream.onStart { emit(0) }
scan.collect() // [0, 1, 2]
ephemient
11/14/2021, 12:00 AMdimsuz
11/14/2021, 1:04 AM