I’ve having trouble understanding the relation bet...
# coroutines
k
I’ve having trouble understanding the relation between two pieces of documentation:
Send and receive operations to channels are fair with respect to the order of their invocation from multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke
receive
gets the element. — <https://kotlinlang.org/docs/channels.html#channels-are-fair
|Channels are fair docs>
and
Merges the given flows into a single flow without preserving an order of elements. All flows are merged concurrently, without limit on the number of simultaneously collected flows. — <https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html
|Iterable<Flow>.merge>
Does this mean that it’s possible for merged flow to emit elements in an unfair fashion? Doesn’t merging these flows just send elements to a single underlying channel?
z
I’m guessing the merge operator doesn’t have enough information about ordering across flows to preserve order across them. So if you are merging 2 flows, and 2 threads running on separate processors emit into them at the same exact instant, the operator has to pick one to send first.
🤔 2
k
I suppose my question is in what scenario would merging flows not preserve the order in which the emission to a channelFlow is invoked?
Yeah that is maybe one scenario, and that’s just nondeterministic behavior. Makes sense.
I wonder if there’s any others?
z
And the definition of “same instant” is at least to some degree determined by hardware+OS
👍 1
Just look at the impl. It’s probably pretty straightforward
k
It is
Copy code
/**
 * Merges the given flows into a single flow without preserving an order of elements.
 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
 *
 * ### Operator fusion
 *
 * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
 */
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
    /*
     * This is a fuseable implementation of the following operator:
     * channelFlow {
     *    forEach { flow ->
     *        launch {
     *            flow.collect { send(it) }
     *        }
     *    }
     * }
     */
    return ChannelLimitedFlowMerge(this)
}
z
So make that hardware+OS+coroutine dispatcher
k
But it quickly bottoms out in the synchronization of Channels which made me question this
c
Yeah, I think the difference comes from the fact that the flows are running in parallel to each other. There’s a race condition from when one flow emits a value and when it’s actually placed into the channel. WRT to each item that does get placed into the channel, it should be fair since that’s the Channel’s contract, but this is only guaranteed from the moment of
channel.send
. Basically, there’s no synchronization among the flows themselves, the synchronization comes from the values being placed into a channel
k
Yup so the race is derived from the ordering of continuations on the dispatcher from the
launched
jobs I suppose
Makes sense
c
Yeah, I think so. For example, a single-threaded dispatcher would not be vulnerable since there shouldn’t be any suspend calls between when the Flow emits and when the value is placed into the channel
👍 1
☝🏻 1