How can I drain a buffered channel of only the buf...
# coroutines
e
How can I drain a buffered channel of only the buffered items? Details: I have a use case where I want to change the order of incoming events so I created a
channelFlow
wrapper with one extra internal channel that is buffered. Out of order items are sent to the buffered channel until I reach the in-order items. At a certain point, I then want to emit all the buffered items and afterwards continue to emit the rest of the incoming events. I tried
Copy code
channelWithBufferedItems.consumeEach {
  channel.send(it)
}
but that doesn’t work because it iterates the buffered channel but afterwards suspends the consumeEach waiting for new items
1
Fixed by switching the code to iterate manually:
Copy code
buffered.consume {
  while (!isEmpty) channel.send(receive()) 
}
isEmpty
will return true in cases where
receive()
would’ve suspended so its good for terminating the loop & cancelling the channel after
z
I think you want
tryReceive
But then you have a race condition: It's possible to get a scenario where another thread enqueues a new item while you're in the process of draining the channel. If it does so continuously, it would look like your drain function never completes. Which might be fine, but the real problem is that the drain operation is not atomic, and it's impossible to reason about when the "drain" operation actually happens (i.e. it's non-linearizable). Also consider two separate threads invoking drain around the same time — which one "occurs" first? I think you can solve this by having drain start by sending a special sentinel to the channel which clearly defines "when" the drain operation happens. Then your drain operation stops draining when it sees that sentinel. This makes the drain linearizable/effectively atomic since the "official time" of the drain is defined by a single, atomic operation (sending the sentinel); any sends that occur after that will not be considered part of the drain, even if the drain is still running when the send occurs.
e
Oh I agree. My actual implementation has receiveCatching.getOrNull. I am not considering multi threading because my context is a single threaded IO operation but yeah what you said makes so much sense