Slackbot
12/04/2023, 4:38 AMWout Werkman
12/04/2023, 7:56 AMimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
suspend fun main() {
val flowThatDoesWorkForever = flow { while (true) emit(doWork()) }
flowThatDoesWorkForever
.chunkedEvery(50.milliseconds)
.collect { chunk ->
println(chunk)
}
}
private fun <T> Flow<T>.chunkedEvery(interval: Duration): Flow<List<T>> = channelFlow {
val list = mutableListOf<T>()
val job = launch {
while (true) {
delay(interval)
channel.send(synchronized(list) {
list.toList().also { list.clear() }
})
}
}
collect { newItem ->
synchronized(list) {
list.add(newItem)
}
}
job.cancel()
}
suspend fun doWork() : Int = (1..10).random().also { delay(it.milliseconds) }
Alessandro Tagliapietra
12/05/2023, 1:27 AMAlessandro Tagliapietra
12/05/2023, 1:30 AM