Eric Martori
08/05/2019, 1:17 PMval flow get() = channel.asFlow()
private val channel =
BroadcastChannel<T>(???)
I want that each time a user of the API calls flow.collect{}
the last value is received on collect, but no values are lost if they are received in quick succession and the sender should suspend if needed for this to happen.
With CONFLATED
I get the last value emited on each new collect
and with BUFFERED
I get the "suspend if needed, don't loose any values" behaviour.
Is there a way to achieve what I want?Eric Martori
08/05/2019, 2:27 PMval flow get() = channel.asFlow().buffer()
private val channel =
BroadcastChannel<T>(CONFLATED)
making the original channel conflated allows for the new collect
calls to have the last value available while the buffer
on the flow allows buffer the values.
What I am not sure is if this will work when the amount of elements surpasses the buffer of the flowZach Klippenstein (he/him) [MOD]
08/05/2019, 4:21 PMBroadcastChannel
and just caching the last-sent value yourself. Something like:
private val channel = BroadcastChannel<Foo>(capacity=1)
private val lastValue? = null
val flow get() = channel.asFlow()
.onStart { emit(lastValue) }
fun send(value: Foo) {
lastValue = value
channel.send(value)
}
Although this has a number of issues:
1. Assumes senders are synchronized. If not, you'd need to use a Mutex
to ensure the two lines in send
don't end up getting interleaved on different threads.
2. If the collector is slow, values send while the first value is being processed will be dropped, since onStart
won't collect the upstream flow (ie. subscribe to the BroadcastChannel
) until after the block returns. You'd need something like an onStartEager
that would collect upstream before running the block, but that operator doesn't exist in the library (not hard to write yourself, but even more code).Zach Klippenstein (he/him) [MOD]
08/05/2019, 6:21 PMEric Martori
08/06/2019, 8:32 AMCONFLATED
channel and a default buffer
in the flow it seems to work fine and passes my current tests