I have the following setup: ``` val flow get()...
# coroutines
e
I have the following setup:
Copy code
val flow get() = channel.asFlow()
    private val channel =
        BroadcastChannel<T>(???)
I want that each time a user of the API calls
flow.collect{}
the last value is received on collect, but no values are lost if they are received in quick succession and the sender should suspend if needed for this to happen. With
CONFLATED
I get the last value emited on each new
collect
and with
BUFFERED
I get the "suspend if needed, don't loose any values" behaviour. Is there a way to achieve what I want?
I think I have a solution that seems to work but I don't know if it will brake:
Copy code
val flow get() = channel.asFlow().buffer()
    private val channel =
        BroadcastChannel<T>(CONFLATED)
making the original channel conflated allows for the new
collect
calls to have the last value available while the
buffer
on the flow allows buffer the values. What I am not sure is if this will work when the amount of elements surpasses the buffer of the flow
z
I don't think there's a simple way to replay the last value to new collectors while also keeping backpressure. You could do it manually with a non-clonflated
BroadcastChannel
and just caching the last-sent value yourself. Something like:
Copy code
private val channel = BroadcastChannel<Foo>(capacity=1)
private val lastValue? = null

val flow get() = channel.asFlow()
  .onStart { emit(lastValue) }

fun send(value: Foo) {
  lastValue = value
  channel.send(value)
}
Although this has a number of issues: 1. Assumes senders are synchronized. If not, you'd need to use a
Mutex
to ensure the two lines in
send
don't end up getting interleaved on different threads. 2. If the collector is slow, values send while the first value is being processed will be dropped, since
onStart
won't collect the upstream flow (ie. subscribe to the
BroadcastChannel
) until after the block returns. You'd need something like an
onStartEager
that would collect upstream before running the block, but that operator doesn't exist in the library (not hard to write yourself, but even more code).
I think there’s other race conditions as well… would definitely be nice if the library solved this.
e
I tried to implement it this way but aside the problems you have mentioned it breaks my tests so it seems that it is not behaving exactly like it should. With
CONFLATED
channel and a default
buffer
in the flow it seems to work fine and passes my current tests