dave08
12/04/2023, 2:13 PMchunked
operator for Flow
, I've seen a few while googling, but it seems that some are said to be hacky, others more simplistic... I wonder why it hasn't actually been implemented in the library until now?dave08
12/04/2023, 2:14 PMJacob
12/04/2023, 4:11 PMdave08
12/04/2023, 4:15 PMdave08
12/04/2023, 4:16 PMJacob
12/04/2023, 4:27 PMdave08
12/04/2023, 4:56 PMJilles van Gurp
12/05/2023, 12:45 PMprivate object Done
private object TimerExpired
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.chunked(
chunkSize: Int = 64,
delayMillis: Long = 250,
): Flow<List<T>> {
require(chunkSize > 0) { "'chunkSize' must be positive: $chunkSize" }
require(delayMillis > 0) { "'delayMillis' must be positive: $delayMillis" }
val upstream: Flow<Any?> = this
return flow<List<T>> {
val timerEnabled = MutableSharedFlow<Boolean?>()
var queue = mutableListOf<T>()
merge(
upstream.onCompletion { emit(Done) },
timerEnabled
.takeWhile { it != null }
.flatMapLatest { enabled ->
if (enabled!!)
flow {
delay(delayMillis)
emit(TimerExpired)
}
else
emptyFlow()
}
)
.collect { element ->
when (element) {
Done -> {
if (queue.isNotEmpty()) {
emit(queue)
queue = mutableListOf()
}
timerEnabled.emit(null)
}
TimerExpired -> {
if (queue.isNotEmpty()) {
emit(queue)
queue = mutableListOf()
}
}
else -> {
queue.add(element as T)
if (queue.size >= chunkSize) {
emit(queue)
queue = mutableListOf()
timerEnabled.emit(false)
} else if (queue.size == 1)
timerEnabled.emit(true)
}
}
}
}
}
dave08
12/05/2023, 2:00 PMJilles van Gurp
12/05/2023, 5:32 PM