https://kotlinlang.org logo
#flow
Title
# flow
a

Andrew K

02/23/2024, 12:34 AM
What is the best way to dynamically add/remove flows to another flow? I have this class so far...
Copy code
class MergedFlowRegistry<T>(
    private val dispatcher: CoroutineContext,
) {
    private val _registeredFlows: MutableStateFlow<List<Flow<T>>> = MutableStateFlow(emptyList())

    val registeredFlows = _registeredFlows.flatMapLatest { flows ->
        merge(*flows.toTypedArray())
    }.shareIn(
        CoroutineScope(dispatcher),
        SharingStarted.WhileSubscribed()
    )

    fun register(flow: Flow<T>) =
        _registeredFlows.update { it + flow }

    fun unregister(flow: Flow<T>) =
        _registeredFlows.update { it - flow }
}
Sometimes I have an issue of missed emissions when adding or removing rapidly... I believe due to flatMapLatest canceling the previously added/removed flows.
d

darkmoon_uk

02/23/2024, 5:59 AM
Interesting, I often
flatMapLatest
& `combine`/`merge` but am not dealing with fast enough emissions that I had considered dropping elements due to the
-Latest
🤔 To keep the `Flow`s continuous I guess you'd want a custom operator that diff's the incoming collections of `Flow`s and starts new
emit()
jobs for added `Flow`s (maintaining a map of downstream emission `Job`s and explicitly cancels those `Job`s for removed `Flow`s.
2 Views