https://kotlinlang.org logo
Title
s

svenjacobs

01/21/2020, 8:33 AM
Hey, I need help with
Flow
🙂 I have a
Flow<State>
that represents an application (UI) state. Now what I want to do is convert this into a flow with certain "side effects" on parts of the state. Here's example code:
data class State(
    val string: String? = null,
    val number: Long = 0
)

val flow = flowOf(
    State(
        string = "Hello world",
        number = 0
    ),
    State(
        string = "Hello world",
        number = 1
    ),
    State(
        string = null,
        number = 1
    ),
    State(
        string = "Hello world 2",
        number = 1
    )
)

val flow2 = flowOf(
    flow.mapNotNull { it.string }
        .onEach { println("string $it") },
    flow.map { it.number }
        .onEach { println("number $it") }
).flattenMerge()

val job = flow2.launchIn(GlobalScope)
The thing is, in the isolated example this works however in my real application I have the following behaviour: 1. The first State object triggers both
onEach
, so the output is
string
and
number
. 2. Any subsequent changes to
State
will always only trigger the second
onEach
, which is
number
. (it is always the last Flow
onEach
that is triggered here. So if there are three inner Flows then only the third will be called) 3. If I use
flattenConcat()
instead of
flattenMerge()
then only the first
onEach
will be called for every State change. I'm lost here. Any ideas? Why am I doing this? Imagine that later a
distinctUntilChanged()
is added before each
onEach
so that the side effect is only triggered when the selected value of the state has changed. But first of all this should even work without
distinctUntilChanged()
. Any ideas?
t

Tash

01/21/2020, 9:07 AM
Guessing that
flattenConcat()
only results in the first
onEach
because as per the docs
Inner flows are collected by this operator _sequentially_.
. So, if in your real implementation the main
Flow<State>
never finishes emitting,
flattenConcat
will only keep collecting the first inner
Flow
with the
string
onEach
?
also would be curious to know how the main
Flow<State>
is hydrated in the real implementation
s

svenjacobs

01/21/2020, 9:24 AM
You're right, the real Flow of State never completes as it originates from UI events. So there are several layers of mapping. For instance a button click event (as a Flow) changes the state. But how come that with flattenMerge only the last inner Flow is always executed?
z

Zach Klippenstein (he/him) [MOD]

01/21/2020, 9:27 AM
In your example your source flow is a
flowOf
. How are you creating that source in production? Is it a
BroadcastChannel
? How is it configured?
s

svenjacobs

01/21/2020, 10:09 AM
@Zach Klippenstein (he/him) [MOD] I'm using a library that binds UI events to
Flow
but basically what it does is
val clickFlow = callbackFlow<Unit> {
  view.setOnClickListener { offer(Unit) }
  awaitClose { view.setOnClickListener(null) }
}
I have multiple of these Flows for different buttons for example. Then I have something like
val actionFlow = flowOf(
  view.clickFlow.map { OnButtonClickAction },
  view.anotherClickFlow.map { OnAnotherButtonClickAction }
).flattenMerge()
Then this
actionFlow
is passed to another class that transforms actions into state changes. So there is a lot of abstraction involved. It's kind of hard to extract this (and identify the source of the problem) into a small, reproducable example.
Up to the point where
Flow<State>
is created this works. So when I do a
stateFlow.onEach { println(it) }
I can see every state change for every UI action. But when I do the
flowOf(stateFlow.map { ... }, stateFlow.map { ... }).flattenMerge()
as shown above this stops working here
Apart from the original problem I just noticed that a
merge(
  state.map { ... },
  state.map { ... },
  ...
)
results in the upstream transformations to be executed multiple times since the state flow is collected multiple times. This is actually not what I want. Is there an operator that just collects once but allows a flow to be "cloned"?
Regarding "cloning" a Flow so that further operations/collections do not influence "upstream" transformations I found this solution
val copy = flow.broadcastIn(scope).asFlow()
but I'm not sure if this is a good solution 😉
d

dave08

01/21/2020, 5:56 PM
Yup, for now, there isn't any sharing operator, so it's either that or subscribing to a new flow every time... but there are libraries and code snippets that try to provide
share
... and a recent PR that hasn't gotten any attention yet... 🤔
There's a pretty long issue about this on Github and Roman said in KotlinConf that this is one of the upcoming features though...
z

Zach Klippenstein (he/him) [MOD]

01/21/2020, 6:09 PM
s

svenjacobs

01/21/2020, 6:15 PM
Thanks @dave08 & @Zach Klippenstein (he/him) [MOD] 🙂
z

Zach Klippenstein (he/him) [MOD]

01/21/2020, 6:47 PM
I’m still not sure why those other emissions are being dropped. As far as I can tell from reading the source, I think this is what happens when you launch `flow2`: 1. Coroutine is launched on
Default
dispatcher (from
GlobalScope
) and begins collecting `flow2. 2. Non-broadcast channel with capacity 16 (
Channel.BUFFERED
) is created, new coroutine is launched to send on it. 3.
ChannelFlowMerge.collectTo
is ran on this new coroutine, and launches a coroutine for each flow in the merge (up to 16 at once). 4. Each of these coroutines creates a non-broadcast,
BUFFERED
(16-capacity) channel and launches a coroutine to send on it (still on Default dispatcher). 5. Each of these coroutines runs the
callbackFlow
logic and registers their event listeners. Then when an event occurs (assuming that this single event is split out into flows, like your example): 1. Widget iterates through its listeners and sends events to each. 2. Assuming the buffers aren’t full, both the `callbackFlow`s get this event added to their buffers and are dispatched onto the Default dispatcher to process. 3. Since the default dispatcher is a pool, the two map+each operations occur concurrently (in an undefined order). However each “map+each” pair executes synchronously, without dispatching, since none of your
map
nor
onEach
blocks suspend. 4. When each map+each finishes, the result gets sent to the channel created in step 2 above. The first one to finish causes dispatch to Default dispatcher to resume the coroutine created in step 1 above and process the next value in the channel. 5. Since that channel is also buffered and the capacity (16) is > the number of flows (2), the second map to finish always immediately sends to the channel. 6. The terminal collector discards the values. So if that’s right, the only potentially weird thing is that the order of the side effects performed by
onEach
is undefined, but that shouldn’t prevent either of them from happening. I would start by putting breakpoints in your side effects and seeing if they’re actually getting hit.
🤯 1
💡 1
s

svenjacobs

01/22/2020, 6:27 AM
@Zach Klippenstein (he/him) [MOD] Thanks for the thorough investigation 😅 The problem went away once I "shared" the original flow (
broadcastIn(scope).asFlow()
workaround) before the
merge(state.map { ... }, state.map { ... }
. But I guess you're on the right track. When I added a
flowOn(Dispatchers.Default)
to the
merge
(before I added share), the emissions appeared in a random flow (random per instance, not per emission).