Gabriel Feo
07/17/2020, 12:40 AMprivate val state = MutableStateFlow(initialState)
fun getState(events: Flow<Event>): Flow<State> =
merge(
state,
events.transformLatest {
emit(state.value.copy(...))
...
emit(state.value.copy(...))
}
).onEach { state.value = it }
Think I'm missing something here, the second emit calling state.value
gets the same value as the first oneoctylFractal
07/17/2020, 12:41 AMmerge
is a concurrent operation if I recall correctly, so the `emit`s are buffered by defaultGabriel Feo
07/17/2020, 2:46 PMstate
and events.transform
will run in two separate coroutines here, this is not a sequential flow
All flows are merged concurrently2. emit suspends until the value is emitted, not until it is collected downstream, that's why I get a race condition by expecting my state (
state.value
) to have been updated already
I don't get, however, what this statement in the docs means:
Operator fusion
Applications of flowOn, buffer, produceIn, and broadcastIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.
octylFractal
07/17/2020, 5:28 PM.buffer(RENDEZVOUS)
after merge
, it knows that merge
uses a channel and will change merge's channel to be a RENDEZVOUS channel, rather than introduce a second channelGabriel Feo
07/17/2020, 6:34 PMjulian
07/17/2020, 6:49 PMmerge
, is RENDEZVOUS
synchronizing between merged upstreams on the one hand and downstream collection on the other? Or does it also affect how upstreams are merged together?octylFractal
07/17/2020, 6:55 PMmerge
works is it launches a coroutine for each Flow that collects it and sends it to the same channel via send
then the downstream flow on the original coroutine just receives from that channel, and calls emit
which does the whole in-place processing as usual (inside this emit
is where the onEach
would be called)julian
07/17/2020, 7:33 PM