https://kotlinlang.org logo
#coroutines
Title
# coroutines
ł

Łukasz Tokarski

03/06/2024, 11:46 AM
Hello Team, did someone had a chance to implement Flow-based equivalent of RxJava's `window(timespan)`operator? https://reactivex.io/documentation/operators/window.html I have read the discussion on Github but would also ask here before I start reinventing the wheel (if that exists already) https://github.com/Kotlin/kotlinx.coroutines/issues/1302
1
r

ross_a

03/06/2024, 1:09 PM
Copy code
fun <T : Any?> Flow<T>.window(
    timeoutMillis: Long
): Flow<List<T>> = channelFlow {
    val itemChannel = produceIn(this)
    var bufferedItems = mutableListOf<T>()
    
    var windowJob: Job? = null
    
    whileSelect {
        windowJob?.onJoin?.invoke {
            send(bufferedItems)
            bufferedItems = mutableListOf()
            windowJob = null
            true
        }
        itemChannel.onReceiveCatching { result ->
            result
                .onSuccess { item -> 
                    bufferedItems += item 
                    
                    if (windowJob == null) {
                        windowJob = this@channelFlow.launch { delay(timeoutMillis) }
                    }
                }
                .onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
                .isSuccess
        }
    }
}
Don't have tests but I do have this
might not be exactly what you're after, windowing starts on an element outside the last window
ł

Łukasz Tokarski

03/06/2024, 1:14 PM
Thank you, @ross_a. It's interesting that we don't cancel the
windowJob
anywhere. I guess that the job lifecycle is bound to the collection of the
channelFlow
but not sure before further analysis of the code. I will give it a try later today by writing some Unit Tests around it.
r

ross_a

03/06/2024, 1:15 PM
you're right that's probably a bug, it should be cancelled if it's not null when it exits the whileSelect
ł

Łukasz Tokarski

03/06/2024, 1:16 PM
All in all, I will keep you posted about the results. Thx!
Hi @ross_a, the mechanism gives nice output when unit tested. I just have to work on that job cancellation because it's not cancelled. Thanks for sharing that code!
It appeared to be an easy thing:
Copy code
invokeOnClose {
        windowJob?.cancel(CancellationException("Cancelled because the collector is no longer attached"))
    }
Whole code:
Copy code
fun <T> Flow<T>.window(
    timeoutMillis: Long
): Flow<List<T>> = channelFlow {
    val itemChannel = produceIn(this)
    var bufferedItems = mutableListOf<T>()

    var windowJob: Job? = null

    invokeOnClose {
        windowJob?.cancel()
    }

    whileSelect {
        windowJob?.onJoin?.invoke {
            send(bufferedItems)
            bufferedItems = mutableListOf()
            windowJob = null
            true
        }
        itemChannel.onReceiveCatching { result ->
            result
                .onSuccess { item ->
                    bufferedItems += item

                    if (windowJob == null) {
                        windowJob = this@channelFlow.launch { delay(timeoutMillis) }
                    }
                }
                .onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
                .isSuccess
        }
    }
}
r

ross_a

03/08/2024, 9:19 AM
👍