igorvd
04/25/2018, 2:40 PMthrottle
operator using coroutines?Vsevolod Tolstopyatov [JB]
04/25/2018, 4:17 PMfun <T> Publisher<T>.throttle(intervalMs: Long) = publish<T> {
val delayChannel = DelayChannel(intervalMs)
openSubscription().use { channel ->
var shouldSend = true
whileSelect {
delayChannel.onReceive {
shouldSend = true
true
}
channel.onReceiveOrNull {
when {
it == null -> false
shouldSend -> {
shouldSend = false
send(it)
true
}
else -> true
}
}
}
}
}
igorvd
04/25/2018, 4:20 PMbj0
04/25/2018, 5:25 PMdelayChannel
, you can just use an onTimeout
in the select clausVsevolod Tolstopyatov [JB]
04/25/2018, 6:09 PMonTimeout
has different semantics, it’s rescheduled (delayed) on every onReceiveOrNull
selection, while DelayChannel#onReceive
is notbj0
04/25/2018, 6:10 PMthrottle
works in rx?Vsevolod Tolstopyatov [JB]
04/25/2018, 6:12 PMrangeWithDelay(1, 10, 100ms).throttle(200ms).consumeEach{println(it)}
1 3 5 7 9
will be printed.
If I’ll replace DelayChannel
with onTimeout
, only 1
will be printedbj0
04/25/2018, 6:31 PMVsevolod Tolstopyatov [JB]
04/25/2018, 6:44 PMthrottleFirst
.
In case of throttle
from rx1, onTimeout
should be used