https://kotlinlang.org logo
Title
i

igorvd

04/25/2018, 2:40 PM
is there any way to achieve something similiar with the
throttle
operator using coroutines?
v

Vsevolod Tolstopyatov [JB]

04/25/2018, 4:17 PM
It’s easy to implement with incoming delay channel (https://github.com/Kotlin/kotlinx.coroutines/issues/327)
fun <T> Publisher<T>.throttle(intervalMs: Long) = publish<T> {
    val delayChannel = DelayChannel(intervalMs)
    openSubscription().use { channel ->
        var shouldSend = true
        whileSelect {

            delayChannel.onReceive {
                shouldSend = true
                true
            }

            channel.onReceiveOrNull {
                when {
                    it == null -> false
                    shouldSend -> {
                        shouldSend = false
                        send(it)
                        true
                    }
                    else -> true
                }
            }
        }
    }
}
i

igorvd

04/25/2018, 4:20 PM
OK I'll try it. Thank you!
b

bj0

04/25/2018, 5:25 PM
You don't really need a
delayChannel
, you can just use an
onTimeout
in the select claus
v

Vsevolod Tolstopyatov [JB]

04/25/2018, 6:09 PM
@bj0 It’s not true,
onTimeout
has different semantics, it’s rescheduled (delayed) on every
onReceiveOrNull
selection, while
DelayChannel#onReceive
is not
b

bj0

04/25/2018, 6:10 PM
isn't that how
throttle
works in rx?
v

Vsevolod Tolstopyatov [JB]

04/25/2018, 6:12 PM
No. In my example for something like
rangeWithDelay(1, 10, 100ms).throttle(200ms).consumeEach{println(it)}
1 3 5 7 9
will be printed. If I’ll replace
DelayChannel
with
onTimeout
, only
1
will be printed
throttle resets its window when it receives a new value
or it did last time i used rx
v

Vsevolod Tolstopyatov [JB]

04/25/2018, 6:44 PM
we were talking about different things 🙂 http://reactivex.io/documentation/operators/sample.html I’ve implemented
throttleFirst
. In case of
throttle
from rx1,
onTimeout
should be used