Hello Everyone, There is an ongoing <issue> about ...
# coroutines
m
Hello Everyone, There is an ongoing issue about a time and size based
chunked
flow extension and we created an implementation for ourselves. Details: We are in need of a
fun <T> Flow<T>.chunked(size: Int, timeout: Duration): Flow<List<T>>
function to chunk elements in flows based on a chunk size and a timeout. The timeout means that uncomplete chunks are emitted downstream anyway after a timeout. Our implementation so far looks like this:
Copy code
fun <T> Flow<T>.chunked(chunkSize: Int, timeout: Duration): Flow<List<T>> =
    channelFlow {
        var result: ArrayList<T>? = null
        var timer: Job? = null
        val mutex = Mutex()

        this@chunked.collect { event ->
            val batch = result ?: ArrayList<T>(chunkSize).also { result = it }
            batch.add(event)
            if (timer == null) {
                timer = launch {
                    delay(timeout)
                    mutex.withLock {
                        if (batch.isNotEmpty()) {
                            send(batch)
                            result = null
                        }
                    }
                }
            }

            mutex.withLock {
                if (batch.size >= chunkSize) {
                    send(batch)
                    result = null
                    timer?.cancelAndJoin()
                    timer = null
                }
            }
        }
    }
Can you please have a look and review potential problems in it? We would like to use it with SharedFlows as well. Thank you in advance!
d
I'd move
timer?.cancelAndJoin()
to the top of the
if
block to try to keep the timer from also trying to send, but that won't be perfect. That is, if the timer expires anytime after
mutex.withLock
gets the lock but before
timer?.cancelAndJoin()
, then it will run it's block as soon as you release the mutex lock. I guess the hope is that
batch.isNotEmpty()
will be false as the next
collect
iteration probably won't occur, but is that guaranteed? IDK
🤷 1
m
Thank you for your thoughts, @Don Mitchell! Maybe adding ensureActive() to the start of the mutex block of the timer solves it, doesnt it? If the timer is cancelled, the block should not run if ensureActive checks for cancellation. What do you think?
d
perhaps
isActive
so it doesn't cancel the parent?
m
That is also an option, thanks!