s3rius
08/06/2023, 9:23 AMFlow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>>
. It basically acts like List's chunked
operation: it collects emissions until count
is reached, then emits them as a List<T>
. It includes an optional timeout
after which remaining items are emitted anyway.
Background: I want to write a logger which persists log output in a file. But since there can be dozens of log emissions per second, I want to write them in batches. timeout
is used to ensure that logs are guaranteed to be written after a few seconds at the most.
I would love to have some feedback on
1. Could this be (easily) build with existing flow functions?
2. Is my implementation proper? Covers all cases and works under all circumstances? Did I miss something?
The code and a series of tests to validate functionality can be found in this pastebin: https://pastebin.com/1reJivVLChris Fillmore
08/09/2023, 12:05 PMs3rius
08/15/2023, 8:44 AMgabfssilva
08/16/2023, 6:19 AMimplementation("com.river-kt:core:1.0.0-alpha10")
Osman Saral
08/17/2023, 6:58 PMa1, a1, a2, a1, a2, a3
so I needed to check if an item is on the chunk already.
I used a very simple approach for this. I used a mutableListOf<T>() for the chunk, filtered out any item with
.filter { item ->
!items.any { it.id == item.id } //I didn't want to use contains here
}
and used transformWhile
to check if I reached at the end of the 'pages'
My whole implementation is something like this:
@OptIn(FlowPreview::class)
inline fun <reified T: DbDocument> Flow<List<T>>.chunked(
page: Int,
pageSize: Int = 15,
totalCount: Int
): Flow<List<T>> {
val chunk = mutableListOf<T>()
return flatMapConcat { items ->
items.asFlow()
}
.filter { item ->
!chunk.any { it.id == item.id }
}
.transformWhile { document ->
chunk.add(document)
val end = chunk.size == pageSize * page || chunk.size == totalCount
if (end) {
emit(chunk.toList())
chunk.clear()
}
!end
}
}
Do you think I can improve my implementation with channels?gabfssilva
08/18/2023, 5:00 PMOsman Saral
08/21/2023, 11:37 AMtransformWhile
but it cancels the flow when the predicate is false. I don't want the flow to be cancelled. So I need to think another way of turning flow of items to flow of list of items.gabfssilva
08/21/2023, 5:18 PMOsman Saral
08/21/2023, 5:52 PMgabfssilva
08/21/2023, 7:26 PMFlow<T>
instead of a Flow<List<T>>
you wrote.
I wasn't quite sure why you need the page
and pageSize
, so I removed it. It seems to be a bit redundant.
Now, once you have the Flow<List<DbDocument>>
, you can do the following:
dbDocumentsFlow
.flatMapConcat { items -> items.asFlow() } // Flow<DbDocument>
.chunked(100) // Flow<List<DbDocument>>, but without duplicates within the chunk
there may be duplicated items between two different chunks, but that's something impossible to avoid unless you keep more information in memory across multiple chunks, which may lead to memory leaks, so, I'd avoid that.Osman Saral
08/22/2023, 9:42 AM