Seri
09/17/2019, 7:53 PMfun <T> List<Flow<T>>.join() = channelFlow {
for (flow in this@join) {
launch { flow.collect {
send(it)
}}
}
}
Vsevolod Tolstopyatov [JB]
09/17/2019, 9:24 PMlist.asFlow().flattenConcat()
or flattenMerge
(depends on whether you want to do it sequentially or concurrently)Adam Powell
09/17/2019, 10:44 PMelizarov
09/18/2019, 2:18 AMAdam Powell
09/18/2019, 2:00 PMlist.asFlow().flattenMerge()
recommendation above, where it is not at all obvious that if list
is long enough, where, "long enough" is defined by an opaque property that can be changed out from under other code, that some part of the tail of the list will just be ignored.Adam Powell
09/18/2019, 2:05 PM.map
to a list of flows that accept MQTT events that affect each device and emit commands for this specific hub to control those devices. The input flows all have the same effective lifetime; one won't complete before any other. I was quite surprised when over half of those devices were not accepting commands when I switched from a custom operator to flattenMerge
and it took a while to discover why. 🙂Adam Powell
09/18/2019, 2:26 PMelizarov
09/18/2019, 2:39 PMAdam Powell
09/18/2019, 2:41 PMAdam Powell
09/18/2019, 2:44 PMlist.asFlow().flattenMerge()
is a different enough case to get some consideration as well. An unlimited-by-default Iterable<Flow<T>>.mergeFlows(): Flow<T>
can more reasonably assume that the input is bounded than an operator on Flow<Flow<T>>
can.Adam Powell
09/18/2019, 2:46 PMList<Flow<T>>.merge...
if a little extra assurance of that bounded nature is warranted)elizarov
09/18/2019, 3:35 PMmerge
operator.elizarov
09/18/2019, 3:36 PM