is there any way to achieve something similiar wit...
# coroutines
i
is there any way to achieve something similiar with the
throttle
operator using coroutines?
v
It’s easy to implement with incoming delay channel (https://github.com/Kotlin/kotlinx.coroutines/issues/327)
Copy code
fun <T> Publisher<T>.throttle(intervalMs: Long) = publish<T> {
    val delayChannel = DelayChannel(intervalMs)
    openSubscription().use { channel ->
        var shouldSend = true
        whileSelect {

            delayChannel.onReceive {
                shouldSend = true
                true
            }

            channel.onReceiveOrNull {
                when {
                    it == null -> false
                    shouldSend -> {
                        shouldSend = false
                        send(it)
                        true
                    }
                    else -> true
                }
            }
        }
    }
}
i
OK I'll try it. Thank you!
b
You don't really need a
delayChannel
, you can just use an
onTimeout
in the select claus
v
@bj0 It’s not true,
onTimeout
has different semantics, it’s rescheduled (delayed) on every
onReceiveOrNull
selection, while
DelayChannel#onReceive
is not
b
isn't that how
throttle
works in rx?
v
No. In my example for something like
rangeWithDelay(1, 10, 100ms).throttle(200ms).consumeEach{println(it)}
1 3 5 7 9
will be printed. If I’ll replace
DelayChannel
with
onTimeout
, only
1
will be printed
throttle resets its window when it receives a new value
or it did last time i used rx
v
we were talking about different things 🙂 http://reactivex.io/documentation/operators/sample.html I’ve implemented
throttleFirst
. In case of
throttle
from rx1,
onTimeout
should be used