Is there a way to make a transformation of Flow with a statefull collector? I am migrating kmath streaming API to flow and I need to implement a chunking and windowing operation (take
n
subsequent elements and join them into one).
m
marstran
04/26/2019, 10:59 AM
I'm not too familiar with the API yet, but maybe something like this will work?
Copy code
fun <T> Flow<T>.chunked(chunkSize: Int) = flow {
val mutList = mutableListOf<T>()
collect { elem ->
mutList.add(elem)
if (mutList.size == chunkSize) {
emit(mutList.toList())
mutList.clear()
}
}
// Emit the final chunk if there are elements left
if (mutList.isNotEmpty()) {
emit(mutList.toList());
}
}
a
altavir
04/26/2019, 11:00 AM
Yes, I am currently writing something quite alike, but I was wondering if there out-of-box solutions