Anyone has a gist or any suggestions on how to imp...
# coroutines
h
Anyone has a gist or any suggestions on how to implement a
sample
operator from RX on a
Channel
? I need to discard items from the channel if they come at the higher rate than I specify.
s
@hmole I have an Rx like
throttleFirst
operator using Coroutines (Channels), if you're interested. I need to get to the office first, though, to find the code ... :)
@hmole This one use Coroutines and to be able to subscribe to a throttler:
Copy code
class ThrottleFirst<T>(private val context: CoroutineContext) {
    private val channel = Channel<T>()

    private val subscriptionRef = AtomicReference<ThrottledSubscription<T>>()

    fun subscribe(observer: (T) -> Unit): Subscription {
        val s = ThrottledSubscription(context, channel, observer)
        return if (subscriptionRef.compareAndSet(null, s)) {
            s
        } else {
            subscriptionRef.get()
        }
    }

    fun onNext(value: T) {
        subscriptionRef.get()?.launch { channel.offer(value) }
    }
}

private class ThrottledSubscription<T>(
    private val context: CoroutineContext,
    private val channel: ReceiveChannel<T>,
    private var observer: ((T) -> Unit)?
) : Subscription, CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = context + parent!!

    private var parent: Job? = null

    override fun cancel() {
        parent?.cancel()
        parent = null
        observer = null
    }

    override fun request(n: Long) {
        if (n > 0) {
            parent = SupervisorJob()
            launch {
                while (isActive) {
                    observer?.invoke(channel.receive())
                    delay(UiThrottler.INPUT_THROTTLE_DURATION)
                }
            }
        }
    }
}
h
Ho does
throttleFirst
differs from
sample
?
s
That is very subtle. Try googling Rx and debounce vs throttleFirst and throttleLast