Nikita
03/21/2024, 4:56 AMfun list() : Flow<List<ID>>
and fun propertiesById(id: ID): Flow<List<Prop>>
Essentially, I want to combine them, so that I have the full information about ID + All properties
that ID can have (props can be updated)
I can do that by doing something like this:
val result = list().flatMapLatest { ids ->
val propFlow = ids.map { id ->
propertiesById(id).map { props ->
MyClass(id, props)
}
}
combine(propFlow) { it.toList() }
}
This works, and gives me the Flow<List<MyClass>>
which has all the information, the problem is: anytime there is a new ID added/removed - Flow propertiesById
is cancelled and started again. which is quite inefficient since propertiesById is doing network calls over and over again.
Is there a way to somehow cache the my propertiesById
flow, so when we get a new list of IDs, we can just re-use the flow which has all the information? (i.e avoid cancel and create flow over and over?)
ps. flatMapConcat
is not suitable, since the propertiesById
does not finish, as it keeps listening for new properties.Stylianos Gakis
03/21/2024, 9:25 AMfun propertiesById(id: ID): Flow<List<Prop>>
I can’t see how it’d be possible to have a new ID but then still keep the same flow active. A new ID will need to call this again and you get back a new Flow.
If you keep all flows hot and running for all IDs that you may ever get, wouldn’t that be even worse since it’d keep all of them alive at the same time instead of tearing the old one down and starting the new one?ephemient
03/21/2024, 1:12 PMinline fun <T, reified U> Flow<Iterable<T>>.flatMapCombine(crossinline transform: (T) -> Flow<U>): Flow<List<U>> = channelFlow {
val flows = mutableMapOf<T, Flow<U>>()
collectLatest { keys ->
combine(keys.map { key -> flows.getOrPut(key) { transform(key).stateIn(this) } }) { it.toList() }.collect { send(it) }
}
}
list().flatMapCombine(::propertiesById)
would potentially work, if you don't ever need to cancel any of the propertyById
flows even if the ID is no longer in the listephemient
03/21/2024, 1:14 PMfun <T, U> Flow<Iterable<T>>.flatMapCombine(transform: (T) -> Flow<U>): Flow<List<U>> = channelFlow {
val cache = mutableMapOf<T, U>()
val subscribers = mutableMapOf<T, Job>()
val updates = Channel<Pair<T, U>>()
collectLatest {
val keys = it.toList()
val outdated = subscribers - keys
withContext(NonCancellable) {
for (job in outdated.values) job.cancelAndJoin()
cache.keys.removeAll(outdated.keys)
subscribers.keys.removeAll(outdated.keys)
}
for (key in keys) if (key !in subscribers) subscribers[key] = launch {
transform(key).collect { value -> updates.send(key to value) }
}
run { send(keys.map { key -> if (key in cache) cache.getValue(key) else return@run }) }
for ((key, value) in updates) {
cache[key] = value
run { send(keys.map { key -> if (key in cache) cache.getValue(key) else return@run }) }
}
}
}
ephemient
03/21/2024, 1:15 PMuli
03/21/2024, 4:19 PMFlow.scan
is your friend, storing your propertiesById(id) as SharedFlows with buffer 1 in the accumulator and then combining themephemient
03/21/2024, 8:45 PMCoroutineScope
to perform stateIn
, which I've borrowed from channelFlow
but you won't get with just scan
Sam
03/22/2024, 8:26 AMflow { coroutineScope { … } }
will work too; it doesn't violate the context preservation rule unless you actually start launching new coroutinesephemient
03/22/2024, 2:05 PMNikita
03/24/2024, 9:33 PMreturn flow {
val cache = HashMap<DeviceId, Flow<Device>>()
val res = list().flatMapMerge(concurrency = Int.MAX_VALUE) { deviceIds: List<DeviceId> ->
val flows: List<Flow<Device>> = deviceIds.map { id ->
cache.getOrPut(id) {
properties(id).map { prop -> Device(id.id, prop) }.stateIn(
scope = CoroutineScope(coroutineContext),
)
}
}
combine(flows) { it.toList() }
}
emitAll(res)
}
Nikita
03/24/2024, 9:39 PM