svenjacobs
01/21/2020, 8:33 AMFlow
🙂 I have a Flow<State>
that represents an application (UI) state. Now what I want to do is convert this into a flow with certain "side effects" on parts of the state. Here's example code:
data class State(
val string: String? = null,
val number: Long = 0
)
val flow = flowOf(
State(
string = "Hello world",
number = 0
),
State(
string = "Hello world",
number = 1
),
State(
string = null,
number = 1
),
State(
string = "Hello world 2",
number = 1
)
)
val flow2 = flowOf(
flow.mapNotNull { it.string }
.onEach { println("string $it") },
flow.map { it.number }
.onEach { println("number $it") }
).flattenMerge()
val job = flow2.launchIn(GlobalScope)
The thing is, in the isolated example this works however in my real application I have the following behaviour:
1. The first State object triggers both onEach
, so the output is string
and number
.
2. Any subsequent changes to State
will always only trigger the second onEach
, which is number
. (it is always the last Flow onEach
that is triggered here. So if there are three inner Flows then only the third will be called)
3. If I use flattenConcat()
instead of flattenMerge()
then only the first onEach
will be called for every State change.
I'm lost here. Any ideas? Why am I doing this? Imagine that later a distinctUntilChanged()
is added before each onEach
so that the side effect is only triggered when the selected value of the state has changed.
But first of all this should even work without distinctUntilChanged()
. Any ideas?Tash
01/21/2020, 9:07 AMflattenConcat()
only results in the first onEach
because as per the docs Inner flows are collected by this operator _sequentially_.
. So, if in your real implementation the main Flow<State>
never finishes emitting, flattenConcat
will only keep collecting the first inner Flow
with the string
onEach
?Flow<State>
is hydrated in the real implementationsvenjacobs
01/21/2020, 9:24 AMZach Klippenstein (he/him) [MOD]
01/21/2020, 9:27 AMflowOf
. How are you creating that source in production? Is it a BroadcastChannel
? How is it configured?svenjacobs
01/21/2020, 10:09 AMFlow
but basically what it does is
val clickFlow = callbackFlow<Unit> {
view.setOnClickListener { offer(Unit) }
awaitClose { view.setOnClickListener(null) }
}
I have multiple of these Flows for different buttons for example. Then I have something like
val actionFlow = flowOf(
view.clickFlow.map { OnButtonClickAction },
view.anotherClickFlow.map { OnAnotherButtonClickAction }
).flattenMerge()
Then this actionFlow
is passed to another class that transforms actions into state changes.
So there is a lot of abstraction involved. It's kind of hard to extract this (and identify the source of the problem) into a small, reproducable example.Flow<State>
is created this works. So when I do a stateFlow.onEach { println(it) }
I can see every state change for every UI action. But when I do the flowOf(stateFlow.map { ... }, stateFlow.map { ... }).flattenMerge()
as shown above this stops working heremerge(
state.map { ... },
state.map { ... },
...
)
results in the upstream transformations to be executed multiple times since the state flow is collected multiple times. This is actually not what I want. Is there an operator that just collects once but allows a flow to be "cloned"?val copy = flow.broadcastIn(scope).asFlow()
but I'm not sure if this is a good solution 😉dave08
01/21/2020, 5:56 PMshare
... and a recent PR that hasn't gotten any attention yet... 🤔Zach Klippenstein (he/him) [MOD]
01/21/2020, 6:09 PMsvenjacobs
01/21/2020, 6:15 PMZach Klippenstein (he/him) [MOD]
01/21/2020, 6:47 PMDefault
dispatcher (from GlobalScope
) and begins collecting `flow2.
2. Non-broadcast channel with capacity 16 (Channel.BUFFERED
) is created, new coroutine is launched to send on it.
3. ChannelFlowMerge.collectTo
is ran on this new coroutine, and launches a coroutine for each flow in the merge (up to 16 at once).
4. Each of these coroutines creates a non-broadcast, BUFFERED
(16-capacity) channel and launches a coroutine to send on it (still on Default dispatcher).
5. Each of these coroutines runs the callbackFlow
logic and registers their event listeners.
Then when an event occurs (assuming that this single event is split out into flows, like your example):
1. Widget iterates through its listeners and sends events to each.
2. Assuming the buffers aren’t full, both the `callbackFlow`s get this event added to their buffers and are dispatched onto the Default dispatcher to process.
3. Since the default dispatcher is a pool, the two map+each operations occur concurrently (in an undefined order). However each “map+each” pair executes synchronously, without dispatching, since none of your map
nor onEach
blocks suspend.
4. When each map+each finishes, the result gets sent to the channel created in step 2 above. The first one to finish causes dispatch to Default dispatcher to resume the coroutine created in step 1 above and process the next value in the channel.
5. Since that channel is also buffered and the capacity (16) is > the number of flows (2), the second map to finish always immediately sends to the channel.
6. The terminal collector discards the values.
So if that’s right, the only potentially weird thing is that the order of the side effects performed by onEach
is undefined, but that shouldn’t prevent either of them from happening. I would start by putting breakpoints in your side effects and seeing if they’re actually getting hit.svenjacobs
01/22/2020, 6:27 AMbroadcastIn(scope).asFlow()
workaround) before the merge(state.map { ... }, state.map { ... }
. But I guess you're on the right track. When I added a flowOn(Dispatchers.Default)
to the merge
(before I added share), the emissions appeared in a random flow (random per instance, not per emission).