Is there a standard operator for combining asynchr...
# coroutines
s
Is there a standard operator for combining asynchronous flows that share a supertype? As each value comes in from the source Flows, I want them to immediately join into the same downstream flow. At the moment, I have the following implementation:
Copy code
fun <T> List<Flow<T>>.join() = channelFlow {
    for (flow in this@join) {
        launch { flow.collect {
            send(it)
        }}
    }
}
v
list.asFlow().flattenConcat()
or
flattenMerge
(depends on whether you want to do it sequentially or concurrently)
👍 1
a
the default for `flattenMerge`'s concurrency parameter is quite surprising
e
Why surprising? What would you expect the default to be?
a
Unlimited. Limiting how many input flows are collected concurrently by default makes an assumption that is almost never true for my own common use cases: it assumes that the input flows are relatively short/will complete quickly (or at all) and that flows later in the list of inputs can wait to start collecting. It's especially troublesome in the presence of the
list.asFlow().flattenMerge()
recommendation above, where it is not at all obvious that if
list
is long enough, where, "long enough" is defined by an opaque property that can be changed out from under other code, that some part of the tail of the list will just be ignored.
The case that I encountered some weeks ago was that I have a shared flow of MQTT events that I process into outbound commands to a home automation hub. I have a list of devices enumerated by the hub which I
.map
to a list of flows that accept MQTT events that affect each device and emit commands for this specific hub to control those devices. The input flows all have the same effective lifetime; one won't complete before any other. I was quite surprised when over half of those devices were not accepting commands when I switched from a custom operator to
flattenMerge
and it took a while to discover why. 🙂
Any limit at all as a default seems subject to these same sorts of problems. It doesn't compose well with many combinations of input source of flows plus input item flows. You have to know a lot about the nature of both for a limit to be a correct choice at each usage site and I doubt that there is a good universal limit to use as a default. That the default is a configurable property today seems to point to the same conclusion. In many ways my case was easier to debug than many others I could imagine, where some operations could experience "random" latency until a relatively unrelated merged flow completes and permits the next to begin.
e
Unlimited by default is scary, too. You might easily run out of resources trying to do too many concurrent things. Maybe concurrency limit shall be just explicit?
a
I think that might be the least surprising
I think the recommendation of
list.asFlow().flattenMerge()
is a different enough case to get some consideration as well. An unlimited-by-default
Iterable<Flow<T>>.mergeFlows(): Flow<T>
can more reasonably assume that the input is bounded than an operator on
Flow<Flow<T>>
can.
(or
List<Flow<T>>.merge...
if a little extra assurance of that bounded nature is warranted)
e
Indeed, we need a separate
merge
operator.