Andrew K
02/23/2024, 12:34 AMAndrew K
02/23/2024, 12:34 AMclass MergedFlowRegistry<T>(
private val dispatcher: CoroutineContext,
) {
private val _registeredFlows: MutableStateFlow<List<Flow<T>>> = MutableStateFlow(emptyList())
val registeredFlows = _registeredFlows.flatMapLatest { flows ->
merge(*flows.toTypedArray())
}.shareIn(
CoroutineScope(dispatcher),
SharingStarted.WhileSubscribed()
)
fun register(flow: Flow<T>) =
_registeredFlows.update { it + flow }
fun unregister(flow: Flow<T>) =
_registeredFlows.update { it - flow }
}
Andrew K
02/23/2024, 12:36 AMdarkmoon_uk
02/23/2024, 5:59 AMflatMapLatest
& `combine`/`merge` but am not dealing with fast enough emissions that I had considered dropping elements due to the -Latest
🤔
To keep the `Flow`s continuous I guess you'd want a custom operator that diff's the incoming collections of `Flow`s and starts new emit()
jobs for added `Flow`s (maintaining a map of downstream emission `Job`s and explicitly cancels those `Job`s for removed `Flow`s.