natario1
08/14/2023, 10:28 AMconsumeAsFlow().flatMapConcat { … }
• It must be collected by a few collectors concurrently and I don’t want the expensive processing to run again (consumeAsFlow wouldn’t allow it anyway). So I added shareIn(scope)
• But I still want the flow to end when the channel is closed. So one trick is to emit an “end” signal like null
before sharing, and after sharing, for each collector, do takeWhile { it != null }.filterNotNull()
• Also I still want collectors to receive all elements regardless of timing, so I add something like replay = 50
to the shareIn operator. This is annoying because it’s a randomly high number with no guarantees. At the same time I don’t think shareIn is designed to handle unlimited replay?
It feels wrong to me but I can’t think of any other solution.franztesca
08/14/2023, 11:21 AMconsumeAsFlow().flatMapConcat { … }.buffer(UNLIMITED).shareIn(...)
Joffrey
08/14/2023, 1:07 PMnatario1
08/14/2023, 2:11 PMyou have to put replay cache to unlimitedI will do, thanks.
but not older valuesNo, I do want older values. Subscription will start right after sharing but I might miss the first values depending on the dispatcher.
So you want every collector to get all values, right?Correct. My solution works, it just looks ugly - the fact that I’m using
shareIn
to cache and broadcast the results, but then I have to undo the shareIn
with the emit(null)
trick so that the flow can end, and add infinite replay to mock a cold flow. See some code below.
scope.launch {
val shared = channel
.consumeAsFlow()
.flatMapLatest { ... }
.map { it as X? } // "cast" to emit null on completion
.onCompletion { emit(null) }
.shareIn(this, Eagerly, 500) // random high enough number
coroutineScope {
launch(IO) {
shared.takeWhile { it != null }
.filterNotNull()
.collect { ... }
}
launch {
shared.takeWhile { it != null }
.filterNotNull()
.collect { ... }
}
}
}