I have `Flow<List<Flow<E>>>`, ho...
# coroutines
r
I have
Flow<List<Flow<E>>>
, how can I convert that to
Flow<List<E>>
, where the
Flow
is updated with the lastest info of each inner flow?
s
You can use
flatMapLatest
together with `combine`:
Copy code
val input: Flow<List<Flow<E>>> = …

val output: Flow<List<E>> = input.flatMapLatest { flows ->
  combine(flows) { latestItems ->
    latestItems.toList()
  }
}
The
combine
turns each
List<Flow<E>>
into a
Flow<List<E>>
that emits a new list when any of the flows emits a new item. The
flatMapLatest
means we throw that away and call
combine
again if the original flow emits a new
List<Flow<E>>
.
y
Are there optimised versions of this pattern that only unsubscribe and subscribe to the deltas? I've hit this before and it's potentially noisy for cold flows with history
r
Slap a
distinct()
on top?
r
Copy code
fun <T> Flow<Collection<Flow<T>>>.flattenContents() = channelFlow {

    val outer = produceIn(this)

    val inners = mutableMapOf<Flow<T>, ReceiveChannel<T>>()

    var latestItems = mapOf<Flow<T>, T>()

    whileSelect {
        outer.onReceive { flows ->
            val known = inners.keys
            val removed = (known - flows)
            val inserted = (flows - known)

            removed.forEach { removedFlow ->
                inners.remove(removedFlow)?.cancel()
            }
            inserted.forEach { insertedFlow ->
                inners[insertedFlow] = insertedFlow.produceIn(this@channelFlow)
            }

            true
        }
        inners.forEach { (key, value) ->
            value.onReceiveCatching { result ->
                result.onSuccess { event ->

                    latestItems = latestItems.toMutableMap().apply {
                        put(key, event)
                    }

                    send(latestItems.values)
                }
                true
            }
            // TODO cleanup on failure?
        }
    }
}
🎉 1
very rough
also untested 🙂
persistentHashMap from https://github.com/Kotlin/kotlinx.collections.immutable would be better than that map copy