I have the following setup: ``` val flow get()...
# coroutines
I have the following setup:
Copy code
val flow get() = channel.asFlow()
    private val channel =
I want that each time a user of the API calls
the last value is received on collect, but no values are lost if they are received in quick succession and the sender should suspend if needed for this to happen. With
I get the last value emited on each new
and with
I get the "suspend if needed, don't loose any values" behaviour. Is there a way to achieve what I want?
I think I have a solution that seems to work but I don't know if it will brake:
Copy code
val flow get() = channel.asFlow().buffer()
    private val channel =
making the original channel conflated allows for the new
calls to have the last value available while the
on the flow allows buffer the values. What I am not sure is if this will work when the amount of elements surpasses the buffer of the flow
I don't think there's a simple way to replay the last value to new collectors while also keeping backpressure. You could do it manually with a non-clonflated
and just caching the last-sent value yourself. Something like:
Copy code
private val channel = BroadcastChannel<Foo>(capacity=1)
private val lastValue? = null

val flow get() = channel.asFlow()
  .onStart { emit(lastValue) }

fun send(value: Foo) {
  lastValue = value
Although this has a number of issues: 1. Assumes senders are synchronized. If not, you'd need to use a
to ensure the two lines in
don't end up getting interleaved on different threads. 2. If the collector is slow, values send while the first value is being processed will be dropped, since
won't collect the upstream flow (ie. subscribe to the
) until after the block returns. You'd need something like an
that would collect upstream before running the block, but that operator doesn't exist in the library (not hard to write yourself, but even more code).
I think there’s other race conditions as well… would definitely be nice if the library solved this.
I tried to implement it this way but aside the problems you have mentioned it breaks my tests so it seems that it is not behaving exactly like it should. With
channel and a default
in the flow it seems to work fine and passes my current tests