could even write something more general like
fun <T> Flow<T>.windowed(
size: Int,
step: Int = 1,
partialWindows: Boolean = false
): Flow<List<T>> {
require(size > 0 && step > 0)
return flow {
val window = ArrayDeque<T>(size)
var count = 0
collect {
if (window.size >= size) window.removeFirst()
window.add(it)
if (window.size == size && count == 0) emit(window.toList())
count = (count + 1) % step
}
if (partialWindows) {
while (window.size > 1) {
window.removeFirst()
if (count == 0) emit(window.toList())
count = (count + 1) % step
}
}
}
}
flowOf(1, 2, 3, 4, 5, 6).onStart { emit(0) }.windowed(2).toList() ==
listOf(listOf(0, 1), listOf(1, 2), listOf(2, 3), listOf(3, 4), listOf(4, 5), listOf(5, 6))