defhlt
07/12/2017, 11:11 PMthrottle
feature with coroutines
suspend fun <T> ReceiveChannel<T>.consumeThrottled(throttling: Long, action: (T) -> Unit) {
var timestamp = 0L
for (data in this) {
if (timestamp + throttling < System.currentTimeMillis()) {
timestamp = System.currentTimeMillis()
action(data)
}
}
}
is there a proper way?
1 reply
Your function won’t send the last element until the next one arrive (if ever). I think something like the following can be used (not tested):
fun <T> ReceiveChannel<T>.debounce() = produce<T>(CommonPool) {
val timeout = { launch(context) { delay(300) } }
val source = this@debounce
val NONE = Object()
var last: Any? = NONE
while (!source.isClosedForReceive) {
select<Unit> {
if (last != NONE) { // no need to timeout since we have nothing to send
timeout().onJoin {
if (last != NONE) {
send(last as T)
last = NONE
}
}
}
source.onReceiveOrNull {
if (it != null) {
last = it
}
}
}
}
if (last != NONE) {
send(last as T)
}
}