hi guys i'm trying to implement rx-like `throttle`...
# coroutines
d
hi guys i'm trying to implement rx-like
throttle
feature with coroutines
Copy code
suspend fun <T> ReceiveChannel<T>.consumeThrottled(throttling: Long, action: (T) -> Unit) {
    var timestamp = 0L
    for (data in this) {
        if (timestamp + throttling < System.currentTimeMillis()) {
            timestamp = System.currentTimeMillis()
            action(data)
        }
    }
}
is there a proper way? 1 reply Your function won’t send the last element until the next one arrive (if ever). I think something like the following can be used (not tested):
Copy code
fun <T> ReceiveChannel<T>.debounce() = produce<T>(CommonPool) {
    val timeout = { launch(context) { delay(300) } }
    val source = this@debounce
    val NONE = Object()
    var last: Any? = NONE

    while (!source.isClosedForReceive) {
        select<Unit> {
            if (last != NONE) { // no need to timeout since we have nothing to send
                timeout().onJoin {
                    if (last != NONE) {
                        send(last as T)
                        last = NONE
                    }
                }
            }
            source.onReceiveOrNull {
                if (it != null) {
                    last = it
                }
            }
        }
    }
    if (last != NONE) {
        send(last as T)
    }
}