Łukasz Tokarski
03/06/2024, 11:46 AMross_a
03/06/2024, 1:09 PMfun <T : Any?> Flow<T>.window(
timeoutMillis: Long
): Flow<List<T>> = channelFlow {
val itemChannel = produceIn(this)
var bufferedItems = mutableListOf<T>()
var windowJob: Job? = null
whileSelect {
windowJob?.onJoin?.invoke {
send(bufferedItems)
bufferedItems = mutableListOf()
windowJob = null
true
}
itemChannel.onReceiveCatching { result ->
result
.onSuccess { item ->
bufferedItems += item
if (windowJob == null) {
windowJob = this@channelFlow.launch { delay(timeoutMillis) }
}
}
.onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
.isSuccess
}
}
}
Don't have tests but I do have thisross_a
03/06/2024, 1:10 PMŁukasz Tokarski
03/06/2024, 1:14 PMwindowJob
anywhere. I guess that the job lifecycle is bound to the collection of the channelFlow
but not sure before further analysis of the code.
I will give it a try later today by writing some Unit Tests around it.ross_a
03/06/2024, 1:15 PMŁukasz Tokarski
03/06/2024, 1:16 PMŁukasz Tokarski
03/06/2024, 9:01 PMŁukasz Tokarski
03/06/2024, 9:04 PMinvokeOnClose {
windowJob?.cancel(CancellationException("Cancelled because the collector is no longer attached"))
}
Łukasz Tokarski
03/06/2024, 9:05 PMfun <T> Flow<T>.window(
timeoutMillis: Long
): Flow<List<T>> = channelFlow {
val itemChannel = produceIn(this)
var bufferedItems = mutableListOf<T>()
var windowJob: Job? = null
invokeOnClose {
windowJob?.cancel()
}
whileSelect {
windowJob?.onJoin?.invoke {
send(bufferedItems)
bufferedItems = mutableListOf()
windowJob = null
true
}
itemChannel.onReceiveCatching { result ->
result
.onSuccess { item ->
bufferedItems += item
if (windowJob == null) {
windowJob = this@channelFlow.launch { delay(timeoutMillis) }
}
}
.onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
.isSuccess
}
}
}
ross_a
03/08/2024, 9:19 AM