https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

Nikky

12/15/2020, 9:57 AM
i made a chunked flo operator and it seems to work fine in testing but for some reason only when deployed to google cloud run it shits itself and code executed through it takes 100+ times longer to complete afaik the cloud run VMs are limited to 2 cores, so i am guessing it is running into a parallelism limits on the default dispatcher just have no idea how my code triggers that.. because calling it directly (without the chunker operator and buffering) works fine https://gist.github.com/NikkyAI/025436616e81ff0a1711a6144fba0066 any help in suggestions on how to reproduce this locally or what i might have overlooked are appreciated
m

Marc Knaup

12/15/2020, 10:00 AM
Maybe it’s the
CopyOnWriteArrayList
? It uses regular thread locking, not coroutine-based locking.
n

Nikky

12/15/2020, 10:11 AM
which datastructure would you recommend ? also.. is there a more coroutine friendly solution to the locks ?
m

Marc Knaup

12/15/2020, 10:12 AM
Ideally you find a solution without any locking where all modifications happens in the same coroutine context without any concurrency.
n

Nikky

12/15/2020, 10:15 AM
that seems hard because i want a timer or timeout to trigger emitting the page
m

Marc Knaup

12/15/2020, 10:18 AM
You could convert a
Flow<T>
internally to chunking
Flow<Any?>
that collects all upstream elements in a list. In a separate coroutine you run your timer that periodically throws a
Flush
object into the
Flow<Any?>
. Once that chunking Flow encounters the
Flush
object you simply send the list and create a new empty list. No locking needed.
n

Nikky

12/15/2020, 10:19 AM
and as far as i can tell the issue is not blocking within the chunked operator.. but probably some mmistake i did causing one more thread to block and causing threadswitching later in the pipeline (does not help that the things later use threadpools internally but nothing i can do about that api)
m

Marc Knaup

12/15/2020, 10:19 AM
So
A, B, C, …, D
->
A, B, C, Flush, D, Flush
->
[A, B, C], [D]
n

Nikky

12/15/2020, 10:19 AM
i will try the Flush object idea
m

Marc Knaup

12/15/2020, 10:20 AM
That would also spare you of switching dispatchers which isn’t free.
n

Nikky

12/15/2020, 10:20 AM
can you through a new element into a flow or would i have to send it to the channel ?
m

Marc Knaup

12/15/2020, 10:21 AM
You could combine the upstream
Flow
with a
SharedFlow
maybe.
Not sure if there’s a good operator
flatMapConcat
If the timer rate can be fixed then you can also make use of https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.concurrent/fixed-rate-timer.html
No, that’s the wrong one
Hm, I recall there was a
Flow
that periodically emits but I cannot find it.
n

Nikky

12/15/2020, 10:34 AM
well that would certainly help a lot
if i inbject Flush values into the flow, how do i control the restart of the timer so that after a flush i always have a constant delay to collect values ?
m

Marc Knaup

12/15/2020, 10:38 AM
How exactly is it supposed to delay? Delay after the first element in each chunk was received? Or keep a constant delay between all chunks no matter when upstream emits anything or if it doesn’t emit anything at all? What about empty chunks then?
n

Nikky

12/15/2020, 10:40 AM
emit if the chunk is full or if
delayMillis
passed reset the timer after emit emprty chunks are not emitted
m

Marc Knaup

12/15/2020, 10:42 AM
So the initial delay starts with the first upstream item and subsequent delays start with a flush? And what if a delay is over and there was nothing to flush?
n

Nikky

12/15/2020, 10:42 AM
empty chunk, then nothing is emitted and the timer would just continue, it loops, might as well reset it then too
this code is triggered from within the
startTimer()
launch block currently
Copy code
val toSend = if (toSend.isNotEmpty()) {
    toSend
} else {
    null
}

if(toSend != null) {
    channel.send(toSend)
}
the reason this is split up is because i cannot call suspend functions inside a critical section like withLocak apparently
m

Marc Knaup

12/15/2020, 11:06 AM
It always starts the timer after receiving the first item in a chunk though.
hmm and it doesn’t reset the timer either 🤔
n

Nikky

12/15/2020, 11:22 AM
why mapLatest on the timerflow instead of eg. map ?
m

Marc Knaup

12/15/2020, 11:23 AM
So that the previous delay is canceled when a new value comes in to
timerFlow
I guess it needs some cleanup and testing, but you get the idea.
There’s a potential race condition when the queue is flushed due to size but the timer triggers before it’s canceled.
n

Nikky

12/15/2020, 11:28 AM
cancelling in this usecase pretty much only happens on shutdown of the application and one could make sure that it adds a delay before maybe in onCompletion ?
m

Marc Knaup

12/15/2020, 11:29 AM
No I mean before the timer is canceled
e.g. when the queue has reached size limit and is flushed
n

Nikky

12/15/2020, 11:30 AM
something fails in my tests when running iwth that new code.. so it does not act completely as i expect
m

Marc Knaup

12/15/2020, 11:32 AM
What fails?
n

Nikky

12/15/2020, 11:34 AM
seems like it emits less values than expect sometimes, probably because previous timeout / delays are not cancelled
m

Marc Knaup

12/15/2020, 11:35 AM
It wouldn’t change the amount of emissions. It would probably just emit a chunk early when it contains only one element.
Maybe it’s a dispatcher-related 🤔
n

Nikky

12/15/2020, 11:36 AM
thats something i wanted to avoid but if i can't.. well.. if it fixes the other issues its fine...
m

Marc Knaup

12/15/2020, 11:37 AM
The dispatcher shouldn’t influence it though. hmm.
n

Nikky

12/15/2020, 11:41 AM
you could start a timer after a page was emitted, then it woudl not matter when the first value came in
m

Marc Knaup

12/15/2020, 11:42 AM
That’s what it’s doing?
Ah no
it starts when first element is in
That’s weird though. It would unnecessarily repeat the timer over and over again until new data comes in.
n

Nikky

12/15/2020, 11:54 AM
fair point, this wastes less cycles when there is no data coming in on the expense of always waiting for a full delay if only a bit of data comes in.. works for me
c

Circusmagnus

12/16/2020, 12:34 PM
There are already PRs with chunked implementations. Look up chunked / windowed with size limit: https://github.com/Kotlin/kotlinx.coroutines/pull/1558 and universal chunked with both time and size limit: https://github.com/Kotlin/kotlinx.coroutines/pull/2378 Those PR are still not merged, because there is confusion, how an api for chunking should look like. You are welcome to share your use case and solutions here: https://github.com/Kotlin/kotlinx.coroutines/issues/1302
🙏 1
18 Views