Michael Strasser
09/26/2021, 11:48 AMChannel
, returning a list of items after timeoutMs
milliseconds (the list might be empty). Here is the code:
public suspend fun <E> receiveBatch(
channel: ReceiveChannel<E>,
timeoutMs: Long,
): List<E> {
val batch = mutableListOf<E>()
withTimeoutOrNull(timeoutMs) {
for (item in channel) {
batch.add(item)
}
}
return batch
}
In simple testing everything is fine, but under load sometimes an item is not added to a list and is lost. What I think happens is that withTimeoutOrNull
times out between receiving an item from the channel and adding it to the list.
Does anyone have a solution for this issue, or a better design for what I am trying to do?ephemient
09/26/2021, 12:41 PMval start = TimeSource.Monotonic.markNow()
whileSelect {
onTimeout(timeout - start.elapsedNow()) { false }
channel.onReceiveCatching { result ->
result.onFailure { throw it }
.onClosed { return@onReceiveCatching false }
.onSuccess { batch += it }
true
}
}
Michael Strasser
09/27/2021, 1:08 AM@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun <E> receiveBatch(
channel: ReceiveChannel<E>,
maxTimeMs: Long,
maxSize: Int,
): List<E> {
val items = mutableListOf<E>()
whileSelect {
onTimeout(maxTimeMs) { false }
channel.onReceiveCatching { result ->
result.onFailure { if (it != null) throw it }
.onClosed { return@onReceiveCatching false }
.onSuccess { items += it }
items.size < maxSize
}
}
return items
}
ephemient
09/27/2021, 1:11 AMephemient
09/27/2021, 1:16 AMwhile (true) {
items += withTimeoutOrNull(maxTimeMs) {
channel.receiveCatching().getOrElse { if (it != null) throw it else null }
} ?: break
}
Michael Strasser
09/27/2021, 1:42 AMephemient
09/27/2021, 2:03 AMMichael Strasser
09/27/2021, 2:17 AM@OptIn(ExperimentalCoroutinesApi::class, ExperimentalTime::class)
public suspend fun <E> receiveBatch(
channel: ReceiveChannel<E>,
maxTimeMs: Long,
maxSize: Int,
): List<E> {
val items = mutableListOf<E>()
val start = TimeSource.Monotonic.markNow()
whileSelect {
onTimeout(maxTimeMs - start.elapsedNow().inWholeMilliseconds) { false }
channel.onReceiveCatching { result ->
result.onFailure { if (it != null) throw it }
.onClosed { return@onReceiveCatching false }
.onSuccess { items += it }
items.size < maxSize
}
}
return items
}