hmm. I have producers that will out-produce a cons...
# coroutines
g
hmm. I have producers that will out-produce a consumer. I want the producers to fill a buffer and then over-write the oldest data when they produce on a full buffer. The consumer should always get the newest added element. I think what I'm looking for is a kind of circular linked list. Anybody know of a package with such a data structure and/or a way I can butcher
java.util.concurrent
or similar to give me this?
w
Copy code
Channel.CONFLATED creates a ConflatedChannel which always buffers the most recent item sent to the channel and emits this to the receiver. This way, the receiver always get the most recently sent item.
g
No, I need it to buffer a fixed size of elements. I cant rely on the producers out-running the consumer. There has to be a buffer of some fixed number of elements
r
A channel can accept an arbitrary amount of elements if that’s what you want
Copy code
/**
 * Creates a channel with the specified buffer capacity (or without a buffer by default).
 * See [Channel] interface documentation for details.
 *
 * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
 * @throws IllegalArgumentException when [capacity] < -2
 */
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }
in fact, you may just want the default buffer size (BUFFERED)
b
It sounds like he's looking for CONFLATED that holds a larger number of items rather than just a single item.
w
The whole thing is tricky. It's basically a buffered channel but instead of buffering to the tail, it buffers to the head. A LIFO with a set number of items where the items at the end of the queue get dropped
there is not currently a channel with that type of logic
r
Ah, gotcha. Hopefully this doesn’t sound convoluted, but you could use a conflated channel of type linked list, then use the
scan
operator to add to the queue. Use
map
to hand off the most recent item to your consumer, or give the entire list to your consumer to take the most recent item.
b
I guess I accidentally deleted my last comment. Yeah a
scan
could work. Especially if the buffer ends up being fairly small (a few dozen items). Here's my swing at it https://pl.kotl.in/aDbwuPubF
The part that is lacking is if your consumers end up working faster than the producer as it's not emitting the other buffered items
🤦‍♂️ 1
Version 2 with buffer draining. This one manually manages a buffering list with room for improvement. https://pl.kotl.in/MMRz4iRxU
👍 1
g
@bdawg.io thanks for this; I have a great deal of reading to do on
Flow
.