Kieran Wallbanks
09/17/2024, 4:05 PMross_a
09/17/2024, 4:18 PMval channel = Channel<String>()
GlobalScope.launch {
var bufferedItems = mutableListOf<String>()
whileSelect {
if (bufferedItems.isNotEmpty()) onTimeout(1000) {
// TODO process items
bufferedItems = mutableListOf()
true
}
channel.onReceiveCatching { result ->
result
.onSuccess { item -> bufferedItems += item }
.onFailure {
if (bufferedItems.isNotEmpty()) {
// TODO process items
}
}
.isSuccess
}
}
}
Kieran Wallbanks
09/17/2024, 4:31 PMross_a
09/17/2024, 4:32 PMross_a
09/17/2024, 4:35 PM.onSuccess { item ->
bufferedItems += item
if (bufferedItems.size > 5) {
// TODO process items in bufferedItems
bufferedItems = mutableListOf()
}
}
Kieran Wallbanks
09/17/2024, 4:38 PMross_a
09/17/2024, 4:38 PMross_a
09/17/2024, 4:38 PMKieran Wallbanks
09/17/2024, 4:41 PMross_a
09/17/2024, 4:43 PMross_a
09/17/2024, 4:46 PM/**
* Buffers all elements emitted during a debounced window and emits them as a [List] once the debounce timer expires.
*/
fun <T : Any?> Flow<T>.bufferingDebounce(
timeoutMillis: Long
): Flow<List<T>> = channelFlow {
val itemChannel = produceIn(this)
var bufferedItems = mutableListOf<T>()
whileSelect {
if (bufferedItems.isNotEmpty()) onTimeout(timeoutMillis) {
send(bufferedItems)
bufferedItems = mutableListOf()
true
}
itemChannel.onReceiveCatching { result ->
result
.onSuccess { item -> bufferedItems += item }
.onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
.isSuccess
}
}
}
Kieran Wallbanks
09/17/2024, 4:47 PMross_a
09/17/2024, 4:47 PMdebop
09/18/2024, 2:49 AM/**
* debounced window λ΄μ λ°μν λͺ¨λ μμλ₯Ό λ²νΌλ§νκ³ , λλ°μ΄μ€ νμ΄λ¨Έκ° λ§λ£λλ©΄ [List]λ‘ λ°νν©λλ€.
*
*
* val source = flow {
* emit(1)
* delay(110)
* emit(2)
* delay(90)
* emit(3)
* delay(110)
* emit(4)
* delay(90)
* }
* val buffered = source.bufferingDebounce(200.milliseconds) // [1, 2], [3, 4]
* *
* @param timeout λλ°μ΄μ€ νμμμ
* @return λλ°μ΄μ€λ [List] μμλ₯Ό λ°ννλ [Flow]
*/
fun <T: Any?> Flow<T>.bufferingDebounce(timeout: Duration): Flow<List<T>> = channelFlow {
val itemChannel = produceIn(this)
var bufferedItems = mutableListOf<T>()
var deboundedTimeout = timeout
whileSelect {
var prevTimeMs = System.currentTimeMillis()
if (bufferedItems.isNotEmpty()) {
onTimeout(deboundedTimeout) {
send(bufferedItems)
bufferedItems = mutableListOf()
deboundedTimeout = timeout
true
}
}
itemChannel.onReceiveCatching { result ->
val receiveTimeMs = System.currentTimeMillis()
deboundedTimeout -= (receiveTimeMs - prevTimeMs).milliseconds
prevTimeMs = receiveTimeMs
result
.onSuccess { item -> bufferedItems.add(item) }
.onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
.isSuccess
}
}
}
debop
09/18/2024, 2:50 AM@Test
fun `debounced window λ΄μ λ°μν λͺ¨λ μμλ₯Ό λ²νΌλ§νκ³ , λλ°μ΄μ€ νμ΄λ¨Έκ° λ§λ£λλ©΄ Listλ‘ λ°νν©λλ€`() = runBlocking<Unit> {
val source = flow {
emit(1)
delay(110)
emit(2)
delay(90)
emit(3)
delay(110)
emit(4)
delay(90)
}
val buffered = source.bufferingDebounce(200.milliseconds) // [1, 2], [3, 4]
val itemLists = buffered.toList()
log.debug { "itemLists=$itemLists" }
itemLists shouldHaveSize 2 shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4))
}