Can I set the min interval for the downstream to c...
# coroutines
s
Can I set the min interval for the downstream to consume? (using flow or channel) for example:
It like throttle but do not skip any value
k
Something like this would work, the code can definitely be improved
Copy code
fun <T> Flow<T>.throttle(minBetweenMillis: Long): Flow<T> {
    require(minBetweenMillis > 0)
    var lastEmitted = System.currentTimeMillis() - minBetweenMillis
    return this.map {
        val currentTime = System.currentTimeMillis()
        val diff = currentTime - lastEmitted
        if (diff < minBetweenMillis) {
            delay(minBetweenMillis - diff)
            lastEmitted += minBetweenMillis
        } else {
            lastEmitted = currentTime
        }
        it
    }
}
e
I'm not sure if an existing operator exists, but you might be able to cheaply
transform
the flow into another. The
transform
operation should: 1.
emit
a value only if the delay job (see next point) is not active 2.
launch
a coroutine with a
delay(n)
(
n
is the delay you want)
A naive implementation with flaws:
Copy code
var delayJob: Job? = null

fun Flow<*>.throttleDownstream(millis: Long): Flow<*> = transform { value ->
	if (delayJob?.isActive != true) {
		emit(value)
		delayJob = coroutineScope { launch { delay(millis) } }
	}
}
An issue with it is that it keeps a single external mutable delay job
a
I think you can use something like this:
Copy code
val timer = flow {
    while (true) {
        emit(Unit)
        delay(3000)
    }
}
youFlow.zip(timer) { v, _ -> v }.collect()
e
Here a working example of my (modified) operator: https://pl.kotl.in/eAHqQ9hNC Still not saying that it's great 😉 But it seems to work
Ah, wait, I completely misunderstood your question. Now I've looked at the image and its values and my operator does not solve your issue 😉
The timer approach written by Albert works
s
How about ticker with zip ?
@Albert Chang Does the
emit(Unit)
would be suspend until zip done?
a
@Simon Lin It will be suspended until the next value from your flow is emitted.
🙆‍♂️ 1
g
An issue with it is that it keeps a single external mutable delay job
@Erik Just a side note, not about original question, that you can easily fix it. your operator is a new flow, it can include any kind internal state, so no need to make it global, it will be mutable just for this operator, for example your code can be rewritten like this:
Copy code
fun <T> Flow<T>.throttleDownstream(millis: Long): Flow<T> = flow {
    var delayJob: Job? = null
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        if (delayJob?.isActive != true) {
            emit(value)
            delayJob = coroutineScope { launch { delay(millis) } }
        }
    }
}
So now it doesn’t include any global state
e
Thanks Andrey. I already fixed it in my first link to pl.kotl.in
u
Copy code
This dispatcher shares threads with a Default dispatcher, so using withContext(<http://Dispatchers.IO|Dispatchers.IO>) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread. As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) during operations over IO dispatcher.
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html