I'm on the way of bringing one more person to Kotl...
# codereview
p
I'm on the way of bringing one more person to Kotlin. To show him advantages of Kotlin we decided that I'll rewrite logic from one Java function. The function is receiving stream of elements and producing batches of elements limited by size and timeout (what came first) provided as function parameters. Please, review code in Kotlin for such logic (in thread).
Copy code
suspend fun process(
        input: ReceiveChannel<Int>,
        batchSize: Int,
        batchTimeoutMs: Long,
        onBatch: (List<Int>) -> Unit
    ) {
        var channelNotClosed = true
        do {
            val batch = buildList {
                withTimeoutOrNull(batchTimeoutMs) {
                    repeat(batchSize) {
                        add(input.receiveCatching().getOrNull() ?: kotlin.run {
                            channelNotClosed = false
                            return@repeat
                        })
                    }
                }
            }
            if (batch.isNotEmpty()) {
                onBatch(batch)
            }
        } while (channelNotClosed)
    }
e
that feels odd. the duration of the batch depends on when in the period the first element is received, e.g.
Copy code
process(
    input = produce {
        repeat(20) {
            send(it)
            delay(50)
        }
    },
    batchSize = 2,
    batchTimeoutMs = 60,
) { println(it) }
produces some groups of 1 and some groups of 2 even though everything is consistently timed
whereas this consistently starts the timer on the first item of each batch, so they all end up the same
Copy code
suspend fun <E> ReceiveChannel<E>.consumeBatched(
    maxSize: Int,
    maxDuration: Duration,
    onBatch: suspend (List<E>) -> Unit,
) = consume {
    while (true) {
        val batch = buildList {
            receiveCatching()
                .onSuccess { add(it) }
                .onFailure { return@consume }
            withTimeoutOrNull(maxDuration) {
                while (size < maxSize) {
                    receiveCatching()
                        .onSuccess { add(it) }
                        .onFailure { return@withTimeoutOrNull }
                }
            }
        }
        onBatch(batch)
    }
}
Copy code
produce {
    repeat(20) {
        send(it)
        delay(50)
    }
}
    .consumeBatched(
        maxSize = 2,
        maxDuration = 60.milliseconds,
    ) { println(it) }
🙏 1