igorvd
04/25/2018, 2:40 PMthrottle operator using coroutines?Vsevolod Tolstopyatov [JB]
04/25/2018, 4:17 PMfun <T> Publisher<T>.throttle(intervalMs: Long) = publish<T> {
val delayChannel = DelayChannel(intervalMs)
openSubscription().use { channel ->
var shouldSend = true
whileSelect {
delayChannel.onReceive {
shouldSend = true
true
}
channel.onReceiveOrNull {
when {
it == null -> false
shouldSend -> {
shouldSend = false
send(it)
true
}
else -> true
}
}
}
}
}igorvd
04/25/2018, 4:20 PMbj0
04/25/2018, 5:25 PMdelayChannel, you can just use an onTimeout in the select clausVsevolod Tolstopyatov [JB]
04/25/2018, 6:09 PMonTimeout has different semantics, it’s rescheduled (delayed) on every onReceiveOrNull selection, while DelayChannel#onReceive is notbj0
04/25/2018, 6:10 PMthrottle works in rx?Vsevolod Tolstopyatov [JB]
04/25/2018, 6:12 PMrangeWithDelay(1, 10, 100ms).throttle(200ms).consumeEach{println(it)}
1 3 5 7 9 will be printed.
If I’ll replace DelayChannel with onTimeout, only 1 will be printedbj0
04/25/2018, 6:31 PMbj0
04/25/2018, 6:31 PMbj0
04/25/2018, 6:31 PMVsevolod Tolstopyatov [JB]
04/25/2018, 6:44 PMthrottleFirst.
In case of throttle from rx1, onTimeout should be used