i made a chunked flo operator and it seems to work...
# coroutines
n
i made a chunked flo operator and it seems to work fine in testing but for some reason only when deployed to google cloud run it shits itself and code executed through it takes 100+ times longer to complete afaik the cloud run VMs are limited to 2 cores, so i am guessing it is running into a parallelism limits on the default dispatcher just have no idea how my code triggers that.. because calling it directly (without the chunker operator and buffering) works fine https://gist.github.com/NikkyAI/025436616e81ff0a1711a6144fba0066 any help in suggestions on how to reproduce this locally or what i might have overlooked are appreciated
m
Maybe it’s the
CopyOnWriteArrayList
? It uses regular thread locking, not coroutine-based locking.
n
which datastructure would you recommend ? also.. is there a more coroutine friendly solution to the locks ?
m
Ideally you find a solution without any locking where all modifications happens in the same coroutine context without any concurrency.
n
that seems hard because i want a timer or timeout to trigger emitting the page
m
You could convert a
Flow<T>
internally to chunking
Flow<Any?>
that collects all upstream elements in a list. In a separate coroutine you run your timer that periodically throws a
Flush
object into the
Flow<Any?>
. Once that chunking Flow encounters the
Flush
object you simply send the list and create a new empty list. No locking needed.
n
and as far as i can tell the issue is not blocking within the chunked operator.. but probably some mmistake i did causing one more thread to block and causing threadswitching later in the pipeline (does not help that the things later use threadpools internally but nothing i can do about that api)
m
So
A, B, C, …, D
->
A, B, C, Flush, D, Flush
->
[A, B, C], [D]
n
i will try the Flush object idea
m
That would also spare you of switching dispatchers which isn’t free.
n
can you through a new element into a flow or would i have to send it to the channel ?
m
You could combine the upstream
Flow
with a
SharedFlow
maybe.
Not sure if there’s a good operator
flatMapConcat
If the timer rate can be fixed then you can also make use of https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.concurrent/fixed-rate-timer.html
No, that’s the wrong one
Hm, I recall there was a
Flow
that periodically emits but I cannot find it.
n
well that would certainly help a lot
if i inbject Flush values into the flow, how do i control the restart of the timer so that after a flush i always have a constant delay to collect values ?
m
How exactly is it supposed to delay? Delay after the first element in each chunk was received? Or keep a constant delay between all chunks no matter when upstream emits anything or if it doesn’t emit anything at all? What about empty chunks then?
n
emit if the chunk is full or if
delayMillis
passed reset the timer after emit emprty chunks are not emitted
m
So the initial delay starts with the first upstream item and subsequent delays start with a flush? And what if a delay is over and there was nothing to flush?
n
empty chunk, then nothing is emitted and the timer would just continue, it loops, might as well reset it then too
this code is triggered from within the
startTimer()
launch block currently
Copy code
val toSend = if (toSend.isNotEmpty()) {
    toSend
} else {
    null
}

if(toSend != null) {
    channel.send(toSend)
}
the reason this is split up is because i cannot call suspend functions inside a critical section like withLocak apparently
m
It always starts the timer after receiving the first item in a chunk though.
hmm and it doesn’t reset the timer either 🤔
n
why mapLatest on the timerflow instead of eg. map ?
m
So that the previous delay is canceled when a new value comes in to
timerFlow
I guess it needs some cleanup and testing, but you get the idea.
There’s a potential race condition when the queue is flushed due to size but the timer triggers before it’s canceled.
n
cancelling in this usecase pretty much only happens on shutdown of the application and one could make sure that it adds a delay before maybe in onCompletion ?
m
No I mean before the timer is canceled
e.g. when the queue has reached size limit and is flushed
n
something fails in my tests when running iwth that new code.. so it does not act completely as i expect
m
What fails?
n
seems like it emits less values than expect sometimes, probably because previous timeout / delays are not cancelled
m
It wouldn’t change the amount of emissions. It would probably just emit a chunk early when it contains only one element.
Maybe it’s a dispatcher-related 🤔
n
thats something i wanted to avoid but if i can't.. well.. if it fixes the other issues its fine...
m
The dispatcher shouldn’t influence it though. hmm.
n
you could start a timer after a page was emitted, then it woudl not matter when the first value came in
m
That’s what it’s doing?
Ah no
it starts when first element is in
That’s weird though. It would unnecessarily repeat the timer over and over again until new data comes in.
n
fair point, this wastes less cycles when there is no data coming in on the expense of always waiting for a full delay if only a bit of data comes in.. works for me
c
There are already PRs with chunked implementations. Look up chunked / windowed with size limit: https://github.com/Kotlin/kotlinx.coroutines/pull/1558 and universal chunked with both time and size limit: https://github.com/Kotlin/kotlinx.coroutines/pull/2378 Those PR are still not merged, because there is confusion, how an api for chunking should look like. You are welcome to share your use case and solutions here: https://github.com/Kotlin/kotlinx.coroutines/issues/1302
🙏 1