Hi there! I was in need of a specific flow operati...
# flow
s
Hi there! I was in need of a specific flow operation which I couldn't easily replicate with existing functions. So I wrote my own. It's a
Flow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>>
. It basically acts like List's
chunked
operation: it collects emissions until
count
is reached, then emits them as a
List<T>
. It includes an optional
timeout
after which remaining items are emitted anyway. Background: I want to write a logger which persists log output in a file. But since there can be dozens of log emissions per second, I want to write them in batches.
timeout
is used to ensure that logs are guaranteed to be written after a few seconds at the most. I would love to have some feedback on 1. Could this be (easily) build with existing flow functions? 2. Is my implementation proper? Covers all cases and works under all circumstances? Did I miss something? The code and a series of tests to validate functionality can be found in this pastebin: https://pastebin.com/1reJivVL
c
s
Thanks for the pointer!
g
@s3rius, I've commented on the thread that Chris mentioned: you can also take a look at River's implementation. The link is on that thread. The implementation is a bit different because I was aiming to split flows based on count and time, which enabled the very same solution you proposed, as well as the ability of having "chunks as flows", not only in-memory lists. If you want to, you can use River directly, so you don't have to code it yourself. It's currently in alpha stage, but I plan to release the first beta in the next couple of weeks, followed by the first final release a little after:
implementation("com.river-kt:core:1.0.0-alpha10")
👀 1
o
Hi, I needed the very similar thing. But I had to filter out the items that were already on the chunk. My flow is something like this: flow 1: a1 flow 2: a1, a2 flow 3: a1, a2, a3 flow N: a1, a2 ... aN I couldn't just accumulate every item, because it would become
a1, a1, a2, a1, a2, a3
so I needed to check if an item is on the chunk already. I used a very simple approach for this. I used a mutableListOf<T>() for the chunk, filtered out any item with
Copy code
.filter { item ->
    !items.any { it.id == item.id } //I didn't want to use contains here
}
and used
transformWhile
to check if I reached at the end of the 'pages' My whole implementation is something like this:
Copy code
@OptIn(FlowPreview::class)
inline fun <reified T: DbDocument> Flow<List<T>>.chunked(
    page: Int,
    pageSize: Int = 15,
    totalCount: Int
): Flow<List<T>> {
    val chunk = mutableListOf<T>()

    return flatMapConcat { items ->
            items.asFlow()
        }
        .filter { item ->
            !chunk.any { it.id == item.id }
        }
        .transformWhile { document ->
            chunk.add(document)
            val end = chunk.size == pageSize * page || chunk.size == totalCount
            if (end) {
                emit(chunk.toList())
                chunk.clear()
            }
            !end
        }
}
Do you think I can improve my implementation with channels?
g
@Osman Saral, you can use a set instead of a list to ensure uniqueness, I would say that’s pretty much it. I don’t think you have to use channels in this case.
🙏 1
o
I'm using
transformWhile
but it cancels the flow when the predicate is false. I don't want the flow to be cancelled. So I need to think another way of turning flow of items to flow of list of items.
g
I think I found the issue. As soon as I get back home I’ll take a look for you, @Osman Saral.
o
I actually solved the problem. The key is always returning true in transformWhile. I'll send the whole implementation later
g
I wrote this: https://pl.kotl.in/CD8SMDVAY, the main difference is that it's for an extension function for a
Flow<T>
instead of a
Flow<List<T>>
you wrote. I wasn't quite sure why you need the
page
and
pageSize
, so I removed it. It seems to be a bit redundant. Now, once you have the
Flow<List<DbDocument>>
, you can do the following:
Copy code
dbDocumentsFlow
    .flatMapConcat { items -> items.asFlow() } // Flow<DbDocument>
    .chunked(100) // Flow<List<DbDocument>>, but without duplicates within the chunk
there may be duplicated items between two different chunks, but that's something impossible to avoid unless you keep more information in memory across multiple chunks, which may lead to memory leaks, so, I'd avoid that.
o
thank you very much @gabfssilva. it's very helpful