I am porting my library from Rx to Flow and one bu...
# coroutines
d
I am porting my library from Rx to Flow and one bug led me to the differences in the
scan
operator:
Copy code
Observable.just(1)
  .doOnSubscribe { println("Hello") }
  .scan(0) { v1, v2 -> v1 + v2 }
  .blockingSubscribe { println("got $it") }

println("===")

flowOf(1)
  .onStart { println("Hello") }
  .scan(0) { v1, v2 -> v1 + v2 }
  .collect { println("got $it") }
results and question in the thread →
this prints:
Copy code
Hello
got 0
got 1
===
got 0
Hello
got 1
This difference currently leads to bugs in code which uses a coroutine-based version of my library. While looking into Flow's
scan
operator I can see why it does this, but do you have any ideas how should I bring this on-par? Write a custom scan operator?
j
Put
onStart
after
scan
?
d
The
flowOf().onStart()
part in my case comes from the library user. Above is a simplified example. I have something more like:
Copy code
merge(userFlow1, userFlow2).scan(...)
I don't control those flows. What I need is to make sure that
scan
emits it's first item after flows are subscribed to.
j
Ah I see. Then yeah maybe your own scan operator could do the trick by first actually getting the first element and then emitting the initial value + the combined value with the first element. This is an interesting quirk, though. I never thought of that. It makes sense that
scan
theoretically doesn't need to subscribe to the upstream flow before emitting its initial element
It'd be interesting to know your use case for this. What do you need
onStart
for in this case? And why do you need
scan
to behave this way?
d
Yeah! Altough "getting the first element" would delay that "onStart" side-effect until item emission, but some code might rely on it being executed even before emission, simply after subscription has been done. That's what puzzles me additionally 🙂
In Rx case, call to
subscribe
will chain-trigger doOnSubscribe upstream. Evem of no item is emmitted. This doesn't happen in this case and in the workaround you've suggested.
It'd be interesting to know your use case for this.
Basically I'm implementing a state machine where
scan
is folding "state" + "side effect" pairs:
scan(initialState to { /* lambda */ }, ...)
which then get rendered and executed respectively. And due to the way this whole "scan"-containing chain is set up, it is important that upstream producers have their subscribe side effects started before any emission starts, as generally is expected and works for Rx.
e
side-effects aren't encouraged with flow, so to me it's unsurprising
d
yeah, but unfortunately I have to support an old code for now... And still, to me it looks kinda wrong that this code
Copy code
merge(
   flowOf(1).onStart { println("one") },
   flowOf(2).onStart { println("two") }
 )
   .scan(0, { a, v -> a + v})
   .collect { println("got $it")}
prints "got 0" and only after that prints "one","two". As an API user I wouldn't expect that. And the bug in my case happens because subscription to the flows happens after first item emission, so they see a wrong state during initialization. Oh well, thought about this for all day, so far I can't find an elegant solution...
e
if I think about the 0 as part of scan's onStart, this makes sense
Copy code
val upstream = flowOf(2).onStart { emit(1) }
val scan = upstream.onStart { emit(0) }
scan.collect() // [0, 1, 2]
1
"onStart" is different from "onSubscribe", which doesn't really exist (well, SharedFlow can, but not in a way that's useful to you either, I think)
d
Indeed in this light it makes sense! Thanks to all of you for the input. I guess I indeed shouldn't map onSubscribe to onStart. Will search for a different ways to make old code less brittle in this regard!