I have a function to receive a batch of items from...
# coroutines
m
I have a function to receive a batch of items from a
Channel
, returning a list of items after
timeoutMs
milliseconds (the list might be empty). Here is the code:
Copy code
public suspend fun <E> receiveBatch(
    channel: ReceiveChannel<E>,
    timeoutMs: Long,
): List<E> {
    val batch = mutableListOf<E>()
    withTimeoutOrNull(timeoutMs) {
        for (item in channel) {
            batch.add(item)
        }
    }
    return batch
}
In simple testing everything is fine, but under load sometimes an item is not added to a list and is lost. What I think happens is that
withTimeoutOrNull
times out between receiving an item from the channel and adding it to the list. Does anyone have a solution for this issue, or a better design for what I am trying to do?
e
something like this perhaps?
Copy code
val start = TimeSource.Monotonic.markNow()
whileSelect {
    onTimeout(timeout - start.elapsedNow()) { false }
    channel.onReceiveCatching { result ->
        result.onFailure { throw it }
            .onClosed { return@onReceiveCatching false }
            .onSuccess { batch += it }
        true
    }
}
m
Thank you, @ephemient! This approach works well. Here is the current version of the function, which also constrains batch size:
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun <E> receiveBatch(
    channel: ReceiveChannel<E>,
    maxTimeMs: Long,
    maxSize: Int,
): List<E> {
    val items = mutableListOf<E>()
    whileSelect {
        onTimeout(maxTimeMs) { false }
        channel.onReceiveCatching { result ->
            result.onFailure { if (it != null) throw it }
                .onClosed { return@onReceiveCatching false }
                .onSuccess { items += it }
            items.size < maxSize
        }
    }
    return items
}
e
the way you've written it there, the timeout is reset on every receive from the channel
if that's what you want, you might as well write
Copy code
while (true) {
    items += withTimeoutOrNull(maxTimeMs) {
        channel.receiveCatching().getOrElse { if (it != null) throw it else null }
    } ?: break
}
m
Thanks for the timeout correction.
e
actually nevermind about my second code there, it has a race issue just like your original one did. but the timeout concern remains
m
Yep. I tried it and found that as well. Here is the current code with corrected timeouts:
Copy code
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalTime::class)
public suspend fun <E> receiveBatch(
    channel: ReceiveChannel<E>,
    maxTimeMs: Long,
    maxSize: Int,
): List<E> {
    val items = mutableListOf<E>()
    val start = TimeSource.Monotonic.markNow()
    whileSelect {
        onTimeout(maxTimeMs - start.elapsedNow().inWholeMilliseconds) { false }
        channel.onReceiveCatching { result ->
            result.onFailure { if (it != null) throw it }
                .onClosed { return@onReceiveCatching false }
                .onSuccess { items += it }
            items.size < maxSize
        }
    }
    return items
}