Simon Lin
01/27/2021, 7:33 AMknthmn
01/27/2021, 10:05 AMfun <T> Flow<T>.throttle(minBetweenMillis: Long): Flow<T> {
require(minBetweenMillis > 0)
var lastEmitted = System.currentTimeMillis() - minBetweenMillis
return this.map {
val currentTime = System.currentTimeMillis()
val diff = currentTime - lastEmitted
if (diff < minBetweenMillis) {
delay(minBetweenMillis - diff)
lastEmitted += minBetweenMillis
} else {
lastEmitted = currentTime
}
it
}
}
Erik
01/27/2021, 10:07 AMtransform
the flow into another. The transform
operation should:
1. emit
a value only if the delay job (see next point) is not active
2. launch
a coroutine with a delay(n)
(n
is the delay you want)var delayJob: Job? = null
fun Flow<*>.throttleDownstream(millis: Long): Flow<*> = transform { value ->
if (delayJob?.isActive != true) {
emit(value)
delayJob = coroutineScope { launch { delay(millis) } }
}
}
Albert Chang
01/27/2021, 10:28 AMval timer = flow {
while (true) {
emit(Unit)
delay(3000)
}
}
youFlow.zip(timer) { v, _ -> v }.collect()
Erik
01/27/2021, 10:45 AMSimon Lin
01/28/2021, 1:23 AMemit(Unit)
would be suspend until zip done?Albert Chang
01/28/2021, 1:35 AMgildor
01/28/2021, 3:07 AMAn issue with it is that it keeps a single external mutable delay job@Erik Just a side note, not about original question, that you can easily fix it. your operator is a new flow, it can include any kind internal state, so no need to make it global, it will be mutable just for this operator, for example your code can be rewritten like this:
fun <T> Flow<T>.throttleDownstream(millis: Long): Flow<T> = flow {
var delayJob: Job? = null
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
if (delayJob?.isActive != true) {
emit(value)
delayJob = coroutineScope { launch { delay(millis) } }
}
}
}
So now it doesn’t include any global stateErik
01/28/2021, 9:50 AMuli
01/28/2021, 2:47 PMThis dispatcher shares threads with a Default dispatcher, so using withContext(<http://Dispatchers.IO|Dispatchers.IO>) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread. As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) during operations over IO dispatcher.
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html