https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

Nikita

03/21/2024, 4:56 AM
I have two functions which look something like this (exposed by a library, so I can't change it):
fun list() : Flow<List<ID>>
and
fun propertiesById(id: ID): Flow<List<Prop>>
Essentially, I want to combine them, so that I have the full information about
ID + All properties
that ID can have (props can be updated) I can do that by doing something like this:
Copy code
val result = list().flatMapLatest { ids ->
    val propFlow = ids.map { id ->
        propertiesById(id).map { props ->
            MyClass(id, props)
        }
    }
    combine(propFlow) { it.toList() }
}
This works, and gives me the
Flow<List<MyClass>>
which has all the information, the problem is: anytime there is a new ID added/removed - Flow
propertiesById
is cancelled and started again. which is quite inefficient since propertiesById is doing network calls over and over again. Is there a way to somehow cache the my
propertiesById
flow, so when we get a new list of IDs, we can just re-use the flow which has all the information? (i.e avoid cancel and create flow over and over?) ps.
flatMapConcat
is not suitable, since the
propertiesById
does not finish, as it keeps listening for new properties.
s

Stylianos Gakis

03/21/2024, 9:25 AM
With the signature
fun propertiesById(id: ID): Flow<List<Prop>>
I can’t see how it’d be possible to have a new ID but then still keep the same flow active. A new ID will need to call this again and you get back a new Flow. If you keep all flows hot and running for all IDs that you may ever get, wouldn’t that be even worse since it’d keep all of them alive at the same time instead of tearing the old one down and starting the new one?
e

ephemient

03/21/2024, 1:12 PM
Copy code
inline fun <T, reified U> Flow<Iterable<T>>.flatMapCombine(crossinline transform: (T) -> Flow<U>): Flow<List<U>> = channelFlow {
    val flows = mutableMapOf<T, Flow<U>>()
    collectLatest { keys ->
        combine(keys.map { key -> flows.getOrPut(key) { transform(key).stateIn(this) } }) { it.toList() }.collect { send(it) }
    }
}

list().flatMapCombine(::propertiesById)
would potentially work, if you don't ever need to cancel any of the
propertyById
flows even if the ID is no longer in the list
if you do care about that, I think the only way would be to drop down into the lower level channels API, e.g. something like
Copy code
fun <T, U> Flow<Iterable<T>>.flatMapCombine(transform: (T) -> Flow<U>): Flow<List<U>> = channelFlow {
    val cache = mutableMapOf<T, U>()
    val subscribers = mutableMapOf<T, Job>()
    val updates = Channel<Pair<T, U>>()
    collectLatest {
        val keys = it.toList()
        val outdated = subscribers - keys
        withContext(NonCancellable) {
            for (job in outdated.values) job.cancelAndJoin()
            cache.keys.removeAll(outdated.keys)
            subscribers.keys.removeAll(outdated.keys)
        }
        for (key in keys) if (key !in subscribers) subscribers[key] = launch {
            transform(key).collect { value -> updates.send(key to value) }
        }
        run { send(keys.map { key -> if (key in cache) cache.getValue(key) else return@run }) }
        for ((key, value) in updates) {
            cache[key] = value
            run { send(keys.map { key -> if (key in cache) cache.getValue(key) else return@run }) }
        }
    }
}
I haven't thought it through to see if that might be missing some corner cases though, and the overall flow never completes even if the individual flows do
u

uli

03/21/2024, 4:19 PM
maybe
Flow.scan
is your friend, storing your propertiesById(id) as SharedFlows with buffer 1 in the accumulator and then combining them
💯 1
👍 1
very nice 1
e

ephemient

03/21/2024, 8:45 PM
you still need some
CoroutineScope
to perform
stateIn
, which I've borrowed from
channelFlow
but you won't get with just
scan
👍 1
s

Sam

03/22/2024, 8:26 AM
flow { coroutineScope { … } }
will work too; it doesn't violate the context preservation rule unless you actually start launching new coroutines
e

ephemient

03/22/2024, 2:05 PM
I actually did try that and it failed
n

Nikita

03/24/2024, 9:33 PM
at the moment got this to work like this:
Copy code
return flow {
            val cache = HashMap<DeviceId, Flow<Device>>()
            val res = list().flatMapMerge(concurrency = Int.MAX_VALUE) { deviceIds: List<DeviceId> ->
                val flows: List<Flow<Device>> = deviceIds.map { id ->
                    cache.getOrPut(id) {
                        properties(id).map { prop -> Device(id.id, prop) }.stateIn(
                            scope = CoroutineScope(coroutineContext),
                        )
                    }
                }
                combine(flows) { it.toList() }
            }
            emitAll(res)
        }
also, tried @ephemient your flatMapCombine, which is also seem to work, and covers my edge cases. will test it more