PHondogo
07/31/2024, 7:23 PMPHondogo
07/31/2024, 7:24 PMsuspend fun process(
input: ReceiveChannel<Int>,
batchSize: Int,
batchTimeoutMs: Long,
onBatch: (List<Int>) -> Unit
) {
var channelNotClosed = true
do {
val batch = buildList {
withTimeoutOrNull(batchTimeoutMs) {
repeat(batchSize) {
add(input.receiveCatching().getOrNull() ?: kotlin.run {
channelNotClosed = false
return@repeat
})
}
}
}
if (batch.isNotEmpty()) {
onBatch(batch)
}
} while (channelNotClosed)
}
ephemient
08/01/2024, 1:58 AMprocess(
input = produce {
repeat(20) {
send(it)
delay(50)
}
},
batchSize = 2,
batchTimeoutMs = 60,
) { println(it) }
produces some groups of 1 and some groups of 2 even though everything is consistently timedephemient
08/01/2024, 2:03 AMsuspend fun <E> ReceiveChannel<E>.consumeBatched(
maxSize: Int,
maxDuration: Duration,
onBatch: suspend (List<E>) -> Unit,
) = consume {
while (true) {
val batch = buildList {
receiveCatching()
.onSuccess { add(it) }
.onFailure { return@consume }
withTimeoutOrNull(maxDuration) {
while (size < maxSize) {
receiveCatching()
.onSuccess { add(it) }
.onFailure { return@withTimeoutOrNull }
}
}
}
onBatch(batch)
}
}
produce {
repeat(20) {
send(it)
delay(50)
}
}
.consumeBatched(
maxSize = 2,
maxDuration = 60.milliseconds,
) { println(it) }