hmole
03/13/2019, 7:01 AMsample
operator from RX on a Channel
? I need to discard items from the channel if they come at the higher rate than I specify.streetsofboston
03/13/2019, 11:36 AMthrottleFirst
operator using Coroutines (Channels), if you're interested. I need to get to the office first, though, to find the code ... :)class ThrottleFirst<T>(private val context: CoroutineContext) {
private val channel = Channel<T>()
private val subscriptionRef = AtomicReference<ThrottledSubscription<T>>()
fun subscribe(observer: (T) -> Unit): Subscription {
val s = ThrottledSubscription(context, channel, observer)
return if (subscriptionRef.compareAndSet(null, s)) {
s
} else {
subscriptionRef.get()
}
}
fun onNext(value: T) {
subscriptionRef.get()?.launch { channel.offer(value) }
}
}
private class ThrottledSubscription<T>(
private val context: CoroutineContext,
private val channel: ReceiveChannel<T>,
private var observer: ((T) -> Unit)?
) : Subscription, CoroutineScope {
override val coroutineContext: CoroutineContext
get() = context + parent!!
private var parent: Job? = null
override fun cancel() {
parent?.cancel()
parent = null
observer = null
}
override fun request(n: Long) {
if (n > 0) {
parent = SupervisorJob()
launch {
while (isActive) {
observer?.invoke(channel.receive())
delay(UiThrottler.INPUT_THROTTLE_DURATION)
}
}
}
}
}
hmole
03/13/2019, 2:56 PMthrottleFirst
differs from sample
?streetsofboston
03/13/2019, 3:15 PM