octylFractal
11/10/2020, 5:13 AMgildor
11/10/2020, 5:29 AMoctylFractal
11/10/2020, 5:40 AMDominaezzz
11/10/2020, 10:29 AM@ExperimentalCoroutinesApi
@ExperimentalTime
fun <T> Flow<T>.bufferChunks(count: Int, maxDuration: Duration): Flow<List<T>> {
return flow {
coroutineScope {
val channel = produceIn(this)
while (!channel.isClosedForReceive) {
val chunk = mutableListOf<T>()
withTimeout(maxDuration) {
for (item in channel) {
chunk.add(item)
if (chunk.size >= count) break
}
}
emit(chunk)
}
}
}
}
nagle
I think.octylFractal
11/10/2020, 10:02 PMCircusmagnus
11/13/2020, 7:27 AM