https://kotlinlang.org logo
Title
d

dimsuz

11/13/2021, 12:44 PM
I am porting my library from Rx to Flow and one bug led me to the differences in the
scan
operator:
Observable.just(1)
  .doOnSubscribe { println("Hello") }
  .scan(0) { v1, v2 -> v1 + v2 }
  .blockingSubscribe { println("got $it") }

println("===")

flowOf(1)
  .onStart { println("Hello") }
  .scan(0) { v1, v2 -> v1 + v2 }
  .collect { println("got $it") }
results and question in the thread →
this prints:
Hello
got 0
got 1
===
got 0
Hello
got 1
This difference currently leads to bugs in code which uses a coroutine-based version of my library. While looking into Flow's
scan
operator I can see why it does this, but do you have any ideas how should I bring this on-par? Write a custom scan operator?
j

Joffrey

11/13/2021, 12:48 PM
Put
onStart
after
scan
?
d

dimsuz

11/13/2021, 12:49 PM
The
flowOf().onStart()
part in my case comes from the library user. Above is a simplified example. I have something more like:
merge(userFlow1, userFlow2).scan(...)
I don't control those flows. What I need is to make sure that
scan
emits it's first item after flows are subscribed to.
j

Joffrey

11/13/2021, 12:52 PM
Ah I see. Then yeah maybe your own scan operator could do the trick by first actually getting the first element and then emitting the initial value + the combined value with the first element. This is an interesting quirk, though. I never thought of that. It makes sense that
scan
theoretically doesn't need to subscribe to the upstream flow before emitting its initial element
It'd be interesting to know your use case for this. What do you need
onStart
for in this case? And why do you need
scan
to behave this way?
d

dimsuz

11/13/2021, 12:56 PM
Yeah! Altough "getting the first element" would delay that "onStart" side-effect until item emission, but some code might rely on it being executed even before emission, simply after subscription has been done. That's what puzzles me additionally 🙂
In Rx case, call to
subscribe
will chain-trigger doOnSubscribe upstream. Evem of no item is emmitted. This doesn't happen in this case and in the workaround you've suggested.
It'd be interesting to know your use case for this.
Basically I'm implementing a state machine where
scan
is folding "state" + "side effect" pairs:
scan(initialState to { /* lambda */ }, ...)
which then get rendered and executed respectively. And due to the way this whole "scan"-containing chain is set up, it is important that upstream producers have their subscribe side effects started before any emission starts, as generally is expected and works for Rx.
e

ephemient

11/13/2021, 10:28 PM
side-effects aren't encouraged with flow, so to me it's unsurprising
d

dimsuz

11/13/2021, 11:02 PM
yeah, but unfortunately I have to support an old code for now... And still, to me it looks kinda wrong that this code
merge(
   flowOf(1).onStart { println("one") },
   flowOf(2).onStart { println("two") }
 )
   .scan(0, { a, v -> a + v})
   .collect { println("got $it")}
prints "got 0" and only after that prints "one","two". As an API user I wouldn't expect that. And the bug in my case happens because subscription to the flows happens after first item emission, so they see a wrong state during initialization. Oh well, thought about this for all day, so far I can't find an elegant solution...
e

ephemient

11/13/2021, 11:59 PM
if I think about the 0 as part of scan's onStart, this makes sense
val upstream = flowOf(2).onStart { emit(1) }
val scan = upstream.onStart { emit(0) }
scan.collect() // [0, 1, 2]
1
"onStart" is different from "onSubscribe", which doesn't really exist (well, SharedFlow can, but not in a way that's useful to you either, I think)
d

dimsuz

11/14/2021, 1:04 AM
Indeed in this light it makes sense! Thanks to all of you for the input. I guess I indeed shouldn't map onSubscribe to onStart. Will search for a different ways to make old code less brittle in this regard!