Márton Matusek
04/08/2025, 9:31 AMchunked
flow extension and we created an implementation for ourselves.
Details:
We are in need of a fun <T> Flow<T>.chunked(size: Int, timeout: Duration): Flow<List<T>>
function to chunk elements in flows based on a chunk size and a timeout. The timeout means that uncomplete chunks are emitted downstream anyway after a timeout.
Our implementation so far looks like this:
fun <T> Flow<T>.chunked(chunkSize: Int, timeout: Duration): Flow<List<T>> =
channelFlow {
var result: ArrayList<T>? = null
var timer: Job? = null
val mutex = Mutex()
this@chunked.collect { event ->
val batch = result ?: ArrayList<T>(chunkSize).also { result = it }
batch.add(event)
if (timer == null) {
timer = launch {
delay(timeout)
mutex.withLock {
if (batch.isNotEmpty()) {
send(batch)
result = null
}
}
}
}
mutex.withLock {
if (batch.size >= chunkSize) {
send(batch)
result = null
timer?.cancelAndJoin()
timer = null
}
}
}
}
Can you please have a look and review potential problems in it? We would like to use it with SharedFlows as well.
Thank you in advance!Don Mitchell
04/11/2025, 9:36 PMtimer?.cancelAndJoin()
to the top of the if
block to try to keep the timer from also trying to send, but that won't be perfect. That is, if the timer expires anytime after mutex.withLock
gets the lock but before timer?.cancelAndJoin()
, then it will run it's block as soon as you release the mutex lock.
I guess the hope is that batch.isNotEmpty()
will be false as the next collect
iteration probably won't occur, but is that guaranteed? IDKMárton Matusek
04/12/2025, 12:50 PMDon Mitchell
04/14/2025, 12:11 PMisActive
so it doesn't cancel the parent?Márton Matusek
04/14/2025, 1:47 PM