https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

dave08

12/04/2023, 2:13 PM
Does anybody have a trustworthy
chunked
operator for
Flow
, I've seen a few while googling, but it seems that some are said to be hacky, others more simplistic... I wonder why it hasn't actually been implemented in the library until now?
Is there any problem, with say, this implementation? https://gist.github.com/AWinterman/8516d4869f491176ebb270dafbb23199
j

Jacob

12/04/2023, 4:11 PM
The reason why it hasn’t been implemented is because the expected behavior is not obvious. Ie if chunk size is 10 and 100ms passes between the availability of the first 5 and the remaining 15, how many chunks and at what sizes should they be emitted. Haven’t looked at the gist but if you google enough you can find one possible implementation from Roman Elizarov himself. I believe its on the road map but they were still discussing what the api should look like.
d

dave08

12/04/2023, 4:15 PM
Could you maybe take a little peek at that snippet? I used it for now, just wondering if there's any obvious problems with it. I don't really need something too complicated, I just need to receive chunks of data from an async db client, and send them in bulks to an api. There shouldn't be any missed, but I'm not really so worried about if it takes a little more or a little less time for each chunk.
I'd suppose that anything Roman posted is probably pretty old and outdated by now...
j

Jacob

12/04/2023, 4:27 PM
https://stackoverflow.com/a/51048783 yeah its old, but might be helpful. You don’t want me peeking at the gist, I don’t actually know this stuff as well as I pretend to.
d

dave08

12/04/2023, 4:56 PM
Thanks anyways, I'll try to see the differences between the two...
j

Jilles van Gurp

12/05/2023, 12:45 PM
Copy code
private object Done
private object TimerExpired

@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.chunked(
    chunkSize: Int = 64,
    delayMillis: Long = 250,
): Flow<List<T>> {
    require(chunkSize > 0) { "'chunkSize' must be positive: $chunkSize" }
    require(delayMillis > 0) { "'delayMillis' must be positive: $delayMillis" }

    val upstream: Flow<Any?> = this

    return flow<List<T>> {
        val timerEnabled = MutableSharedFlow<Boolean?>()
        var queue = mutableListOf<T>()

        merge(
            upstream.onCompletion { emit(Done) },
            timerEnabled
                .takeWhile { it != null }
                .flatMapLatest { enabled ->
                    if (enabled!!)
                        flow {
                            delay(delayMillis)
                            emit(TimerExpired)
                        }
                    else
                        emptyFlow()
                }
        )
            .collect { element ->
                when (element) {
                    Done -> {
                        if (queue.isNotEmpty()) {
                            emit(queue)
                            queue = mutableListOf()
                        }

                        timerEnabled.emit(null)
                    }
                    TimerExpired -> {
                        if (queue.isNotEmpty()) {
                            emit(queue)
                            queue = mutableListOf()
                        }
                    }
                    else -> {
                        queue.add(element as T)

                        if (queue.size >= chunkSize) {
                            emit(queue)
                            queue = mutableListOf()
                            timerEnabled.emit(false)
                        } else if (queue.size == 1)
                            timerEnabled.emit(true)
                    }
                }
            }
    }
}
d

dave08

12/05/2023, 2:00 PM
Nice! That looks more up to date... did you test it? Are you using this in production?
j

Jilles van Gurp

12/05/2023, 5:32 PM
we are using this internally. We had the same issue a few years ago and this is what we ended up with.