Is there a good existing data structure for a buff...
# general-advice
r
Is there a good existing data structure for a buffer where multiple producers add elements and periodically one consumer flushes the lot and resets it? Ideally non-blocking... I wrote this but it seemed like something I shouldn't be writing!
Copy code
class ConcurrentBuffer<T> {

    private val lock = ReentrantReadWriteLock()
    private val addLock = lock.readLock()
    private val flushLock = lock.writeLock()

    private var items = ConcurrentLinkedQueue<T>()

    fun add(item: T) {
        addLock.withLock {
            items.add(item)
        }
    }

    fun flush(): Collection<T> {
        return flushLock.withLock {
            val flushed = items.toList()
            items = ConcurrentLinkedQueue()
            flushed
        }
    }
}
j
Why not a simple coroutines
Channel
?
Is it because "flushing" will not be easy out of the box?
r
I don't have coroutines in the codebase at all yet (I'm slowly migrating a Groovy Dropwizard app to Kotlin, so I'm thread per request anyway) & I'm not familiar with
Channel
. It's not immediately clear to me how a
Channel
would replace that structure... the atomic flushing is the most important aspect of it. But perhaps I need to read and think more.
If there's a coroutine based class that has atomic / thread safe
add
and
flush
methods (or equivalent) then I'd consider putting coroutines in, but if I've still got to write the
flush
method and make it atomic then I don't see a vast amount of value over java concurrency structures. I'm sort of half hoping Loom saves me from really having to get into coroutines.
j
then I don't see a vast amount of value over java concurrency structures
You asked for something non-blocking, which is not delivered by lock-based approaches like this
r
I should have been more specific - non-blocking on the
add
, not the
flush
. I'm under the impression that
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
is non-blocking unless something is trying to use its
WriteLock
pair, and
ConcurrentLinkedQueue
is non blocking.
j
the atomic flushing is the most important aspect of it
I see. What is the use case of it by the way? For instance, why does the consumer flush from time to time instead of processing things as they come? I didn't realize the flushing needed to be "atomic" (not sure what you mean by that in this context). You want producers to suspend when sending items to this structure while the flush happens, and the flush should only consider items that were already sent at the moment it started? Or is it less constrained than that?
It's not immediately clear to me how a
Channel
would replace that structure
Channels are naturally thread-safe, multi-producers, and multi-consumers. An easy-ish approach to this with a
Channel
would be that you have your producers sending items to the channel, and then when you want the flush to happen you can send a sentinel value to the channel and start a consumer that would pop all values until the sentinel (and drop the sentinel). This way, producers can even keep sending to the channel.
Sending to a channel doesn't even suspend if there is sufficient capacity, so with a buffer you would be good to go.
r
It's for gathering messages and periodically sending them as a batch to a third party. Goals: • minimal overhead adding messages - they aren't business critical • no messages lost in normal operation • no duplicate messages in normal operation • OK with occasionally losing messages due to process crash
By "atomic" I meant that the write lock guarantees that no incoming message gets lost in the gap between snapshotting the current buffer and clearing the buffer.
j
Then I guess what you want would be a good way to batch elements. You might find the discussion about time-based chunked operator and natural batching interesting: https://github.com/Kotlin/kotlinx.coroutines/issues/902#issue-392095069
In this thread there are even suggested implementations with several batching strategies. But in any case the sentinel approach would work fine to mark the end of the batch. I guess you could have a coroutine that consumes the channel continuously by aggregating things in a list, and another coroutine that sends "send-a-batch" events via the channel periodically. When the aggregator finds such an event in the channel, it sends the current batch instead of adding that event to the batch. This would never drop a message
r
OK, I'll look into that, thanks