is updated with the lastest info of each inner flow?
s
Sam
02/20/2024, 11:07 AM
You can use
flatMapLatest
together with `combine`:
Copy code
val input: Flow<List<Flow<E>>> = …
val output: Flow<List<E>> = input.flatMapLatest { flows ->
combine(flows) { latestItems ->
latestItems.toList()
}
}
The
combine
turns each
List<Flow<E>>
into a
Flow<List<E>>
that emits a new list when any of the flows emits a new item. The
flatMapLatest
means we throw that away and call
combine
again if the original flow emits a new
List<Flow<E>>
.
y
yschimke
02/20/2024, 1:17 PM
Are there optimised versions of this pattern that only unsubscribe and subscribe to the deltas? I've hit this before and it's potentially noisy for cold flows with history
r
reactormonk
02/20/2024, 1:18 PM
Slap a
distinct()
on top?
r
ross_a
02/20/2024, 1:45 PM
Copy code
fun <T> Flow<Collection<Flow<T>>>.flattenContents() = channelFlow {
val outer = produceIn(this)
val inners = mutableMapOf<Flow<T>, ReceiveChannel<T>>()
var latestItems = mapOf<Flow<T>, T>()
whileSelect {
outer.onReceive { flows ->
val known = inners.keys
val removed = (known - flows)
val inserted = (flows - known)
removed.forEach { removedFlow ->
inners.remove(removedFlow)?.cancel()
}
inserted.forEach { insertedFlow ->
inners[insertedFlow] = insertedFlow.produceIn(this@channelFlow)
}
true
}
inners.forEach { (key, value) ->
value.onReceiveCatching { result ->
result.onSuccess { event ->
latestItems = latestItems.toMutableMap().apply {
put(key, event)
}
send(latestItems.values)
}
true
}
// TODO cleanup on failure?
}
}
}