reactormonk
02/20/2024, 10:37 AMFlow<List<Flow<E>>>
, how can I convert that to Flow<List<E>>
, where the Flow
is updated with the lastest info of each inner flow?Sam
02/20/2024, 11:07 AMflatMapLatest
together with `combine`:
val input: Flow<List<Flow<E>>> = …
val output: Flow<List<E>> = input.flatMapLatest { flows ->
combine(flows) { latestItems ->
latestItems.toList()
}
}
The combine
turns each List<Flow<E>>
into a Flow<List<E>>
that emits a new list when any of the flows emits a new item. The flatMapLatest
means we throw that away and call combine
again if the original flow emits a new List<Flow<E>>
.yschimke
02/20/2024, 1:17 PMreactormonk
02/20/2024, 1:18 PMdistinct()
on top?ross_a
02/20/2024, 1:45 PMfun <T> Flow<Collection<Flow<T>>>.flattenContents() = channelFlow {
val outer = produceIn(this)
val inners = mutableMapOf<Flow<T>, ReceiveChannel<T>>()
var latestItems = mapOf<Flow<T>, T>()
whileSelect {
outer.onReceive { flows ->
val known = inners.keys
val removed = (known - flows)
val inserted = (flows - known)
removed.forEach { removedFlow ->
inners.remove(removedFlow)?.cancel()
}
inserted.forEach { insertedFlow ->
inners[insertedFlow] = insertedFlow.produceIn(this@channelFlow)
}
true
}
inners.forEach { (key, value) ->
value.onReceiveCatching { result ->
result.onSuccess { event ->
latestItems = latestItems.toMutableMap().apply {
put(key, event)
}
send(latestItems.values)
}
true
}
// TODO cleanup on failure?
}
}
}
ross_a
02/20/2024, 1:45 PMross_a
02/20/2024, 1:46 PMross_a
02/20/2024, 1:51 PM