I'm looking to create a "queue" of some sort that ...
# coroutines
k
I'm looking to create a "queue" of some sort that I can put objects into that get processed async. This sounds like a great usecase for a Flow/Channel, but I can't seem to find a way to replicate this exact logic: β€’ The objects will be processed when the queue has reached N elements. β€’ The objects will be processed when the last process was X milliseconds ago. β€’ The same object should never be processed more than once. Does anyone have any pointers on this?
r
Something I have kicking around
Copy code
val channel = Channel<String>()

GlobalScope.launch {
    var bufferedItems = mutableListOf<String>()
    whileSelect {
        if (bufferedItems.isNotEmpty()) onTimeout(1000) {

            // TODO process items

            bufferedItems = mutableListOf()
            true
        }
        channel.onReceiveCatching { result ->
            result
                .onSuccess { item -> bufferedItems += item }
                .onFailure {
                    if (bufferedItems.isNotEmpty()) {
                        // TODO process items 
                        
                    }
                }
                .isSuccess
        }
    }
}
k
I can't really see how this would help me solve my issue of processing the items when the queue reaches a certain size or when it's been a certain amount of time since the last process, unless I'm missing something?
r
the onTimeout performs it after a period of silence
and this minor tweak does it if the number of items is > 5
Copy code
.onSuccess { item -> 
    bufferedItems += item
    if (bufferedItems.size > 5) {
        
        // TODO process items in bufferedItems
        
        bufferedItems = mutableListOf()
    }
}
k
Ah I see! So the code in onTimeout will just run the block every X milliseconds and the code in the onSuccess will process as well at that time. I'm also assuming this is all concurrent enough to not hit any issues if the onTimeout and onSuccess blocks call at the same time?
r
Sorry I thought it was the last event (debounce) rather than last process
whileSelect will choose only one of the paths to run at a time yeah
k
That's very snazzy! And then I assume that when the channel is closed, the onFailure branch will trigger that does the final cleanup and then stops the while?
r
yes correct
You can do similar things to produce rather advanced custom Flow operators quite simply, for example that example was stolen from my operator:
Copy code
/**
 * Buffers all elements emitted during a debounced window and emits them as a [List] once the debounce timer expires.
 */
fun <T : Any?> Flow<T>.bufferingDebounce(
    timeoutMillis: Long
): Flow<List<T>> = channelFlow {
    val itemChannel = produceIn(this)
    var bufferedItems = mutableListOf<T>()
    whileSelect {
        if (bufferedItems.isNotEmpty()) onTimeout(timeoutMillis) {
            send(bufferedItems)
            bufferedItems = mutableListOf()
            true
        }
        itemChannel.onReceiveCatching { result ->
            result
                .onSuccess { item -> bufferedItems += item }
                .onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
                .isSuccess
        }
    }
}
πŸ‘ 1
k
That's really cool - thank you for your help!
r
πŸ‘
d
Above code, timeoutMillis applied on each items. not debounded windows for multiple items.
Copy code
/**
 * debounced window 내에 λ°œμƒν•œ λͺ¨λ“  μš”μ†Œλ₯Ό λ²„νΌλ§ν•˜κ³ , λ””λ°”μš΄μŠ€ 타이머가 만료되면 [List]둜 λ°œν–‰ν•©λ‹ˆλ‹€.
 *
 *
* val source = flow { * emit(1) * delay(110) * emit(2) * delay(90) * emit(3) * delay(110) * emit(4) * delay(90) * } * val buffered = source.bufferingDebounce(200.milliseconds) // [1, 2], [3, 4] *
Copy code
*
 * @param timeout λ””λ°”μš΄μŠ€ νƒ€μž„μ•„μ›ƒ
 * @return λ””λ°”μš΄μŠ€λœ [List] μš”μ†Œλ₯Ό λ°œν–‰ν•˜λŠ” [Flow]
 */
fun <T: Any?> Flow<T>.bufferingDebounce(timeout: Duration): Flow<List<T>> = channelFlow {
    val itemChannel = produceIn(this)
    var bufferedItems = mutableListOf<T>()
    var deboundedTimeout = timeout

    whileSelect {
        var prevTimeMs = System.currentTimeMillis()
        if (bufferedItems.isNotEmpty()) {
            onTimeout(deboundedTimeout) {
                send(bufferedItems)
                bufferedItems = mutableListOf()
                deboundedTimeout = timeout
                true
            }
        }
        itemChannel.onReceiveCatching { result ->
            val receiveTimeMs = System.currentTimeMillis()
            deboundedTimeout -= (receiveTimeMs - prevTimeMs).milliseconds
            prevTimeMs = receiveTimeMs
            result
                .onSuccess { item -> bufferedItems.add(item) }
                .onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
                .isSuccess
        }
    }
}
Test code
Copy code
@Test
    fun `debounced window 내에 λ°œμƒν•œ λͺ¨λ“  μš”μ†Œλ₯Ό λ²„νΌλ§ν•˜κ³ , λ””λ°”μš΄μŠ€ 타이머가 만료되면 List둜 λ°œν–‰ν•©λ‹ˆλ‹€`() = runBlocking<Unit> {
        val source = flow {
            emit(1)
            delay(110)
            emit(2)
            delay(90)
            emit(3)
            delay(110)
            emit(4)
            delay(90)
        }
        val buffered = source.bufferingDebounce(200.milliseconds)  // [1, 2], [3, 4]
        val itemLists = buffered.toList()
        log.debug { "itemLists=$itemLists" }
        itemLists shouldHaveSize 2 shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4))
    }