is there a flow equivalent to <http://reactivex.io...
# coroutines
g
Buffer with timespan? I don’t think so
o
I just wrote https://gist.github.com/octylFractal/e95df0727354b9d6b76f2ede3cae8b89, but it's probably got some bugs 😛
not exactly the same behavior, but it's what I was looking for
d
Copy code
@ExperimentalCoroutinesApi
@ExperimentalTime
fun <T> Flow<T>.bufferChunks(count: Int, maxDuration: Duration): Flow<List<T>> {
	return flow {
		coroutineScope {
			val channel = produceIn(this)
			while (!channel.isClosedForReceive) {
				val chunk = mutableListOf<T>()
				withTimeout(maxDuration) {
					for (item in channel) {
						chunk.add(item)
						if (chunk.size >= count) break
					}
				}
				emit(chunk)
			}
		}
	}
}
I got bored and re-implemented. 🙂
A decent name for this operator would be
nagle
I think.
o
ah, that is much simpler.
c
I am working on PR to add chunk operator to Flow with time, size and natural batching limit FYI. Comments are welcome: https://github.com/Kotlin/kotlinx.coroutines/issues/1302