https://kotlinlang.org logo
#coroutines
Title
# coroutines
g

Gabriel Feo

07/17/2020, 12:40 AM
Would this onEach work to keep the current state of the flow?
Copy code
private val state = MutableStateFlow(initialState)

   fun getState(events: Flow<Event>): Flow<State> =
        merge(
            state, 
            events.transformLatest {
                emit(state.value.copy(...))
                ...
                emit(state.value.copy(...))
            }
        ).onEach { state.value = it }
Think I'm missing something here, the second emit calling
state.value
gets the same value as the first one
o

octylFractal

07/17/2020, 12:41 AM
merge
is a concurrent operation if I recall correctly, so the `emit`s are buffered by default
you may still be able to get what you're looking for by changing the channel to RENDEZVOUS, but it still has a small race that could occur
g

Gabriel Feo

07/17/2020, 2:46 PM
Oh, I see. So just to make sure I got this right: 1.
state
and
events.transform
will run in two separate coroutines here, this is not a sequential flow
All flows are merged concurrently
2. emit suspends until the value is emitted, not until it is collected downstream, that's why I get a race condition by expecting my state (
state.value
) to have been updated already I don't get, however, what this statement in the docs means:
Operator fusion
Applications of flowOnbufferproduceIn, and broadcastIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.
o

octylFractal

07/17/2020, 5:28 PM
1 & 2 are correct operator fusion means that if you put a
.buffer(RENDEZVOUS)
after
merge
, it knows that
merge
uses a channel and will change merge's channel to be a RENDEZVOUS channel, rather than introduce a second channel
👍 1
the other methods also use channels, which is why they are mentioned
g

Gabriel Feo

07/17/2020, 6:34 PM
Got it. Thank you very much! :)
j

julian

07/17/2020, 6:49 PM
@octylFractal When used with
merge
, is
RENDEZVOUS
synchronizing between merged upstreams on the one hand and downstream collection on the other? Or does it also affect how upstreams are merged together?
o

octylFractal

07/17/2020, 6:55 PM
it might affect how they merge, I'm not sure of ordering guarantees for `offer`/`send` on those types of channels
basically how
merge
works is it launches a coroutine for each Flow that collects it and sends it to the same channel via
send
then the downstream flow on the original coroutine just receives from that channel, and calls
emit
which does the whole in-place processing as usual (inside this
emit
is where the
onEach
would be called)
j

julian

07/17/2020, 7:33 PM
Got it. Thanks @octylFractal
4 Views