Is it expected that `delay` is significantly diffe...
# coroutines
r
Is it expected that
delay
is significantly different to using a
fixedRateTimer
when used for a few millis?
For example
Copy code
val context = newSingleThreadContext("tick")
val timeMillis = tickConfig.interval.toMillis()
return channelFlow<Long> {
    var count = 0L

    while (true) {
        val element = count++
        trySend(element)
        kotlinx.coroutines.delay(timeMillis)
    }
}.flowOn(context)
gets me 16-17/second and
Copy code
return callbackFlow {
            var count = 0L
            val fixedRateTimer = fixedRateTimer(period = tickConfig.interval.toMillis()) {
                trySendBlocking(count++)
            }

            awaitClose {
                fixedRateTimer.cancel()
            }
        }
gets me to the expected 20/second
I've attempted a buffer to no effect, checked it was not conflating
k
You might want to try making a ScheduledThreadPoolExecutor into a dispatcher to see if that affects the outcome at all
r
I believe that's what
newSingleThreadContext
is doing behind the scenes
k
Ah yeah I think you’re right.
r
Another variation, same result:
Copy code
val timeMillis = tickConfig.interval.toMillis()
        return GlobalScope.produce(newSingleThreadContext("tick"), capacity = UNLIMITED) {

            var count = 0L

            while (true) {
                val element = count++
                trySend(element)
                kotlinx.coroutines.delay(timeMillis)
            }
        }.consumeAsFlow()
still 16-17 ticks/sec instead of 20 Going to fall back to not using delay for ticks
d
Let's say
count++
and the subsequent
trySend(element)
take 8-10 milliseconds.
delay(50)
then waits for extra 50 milliseconds. In total, each iteration takes 58-60 milliseconds, corresponding to 16-17 iterations per second. I don't know what
fixedRateTimer
is (it's not part of our library), but from its name, I'd assume that it ensures that the average rate (that is, the number of iterations per unit of time) stays constant. This can be implemented with
delay
manually like this:
Copy code
val iterationTime = measureTime {
  val element = count++
  trySend(element)
}
val toWait = iterationLength - iterationTime
delay(toWait)
r
Yeah I did experiment with that, but trySend shouldn't take 8-10 milliseconds right? It's emitting into an unlimited buffer in that last case?
k
I think that’s kinda spot on. From what @Dmitry Khalanskiy [JB] is saying, the
fixedRateTimer
will ensure that the time between the start of a previous task and the start of the subsequent task remains constant. Your code with delay doesn’t actually do that, so you’re building in some extra time to each interval period
To achieve what
fixedRateTimer
is doing, I think you’ll need to introduce some concurrency.
I have a library which facilitates job schedules that’s built on top of
delay
. I had to introduce a mode that either cancels the previous job or runs them concurrently because of this problem.
I’d like to be explicit, one moment.
Copy code
val context = newSingleThreadContext("tick")
val timeMillis = tickConfig.interval.toMillis()
return channelFlow<Long> {
    var count = 0L

    while (true) {

        // Executes in, for example, 10ms ///
        val element = count++
        trySend(element)
        ////////////////////////////////////

        // Delays for 50ms
        kotlinx.coroutines.delay(timeMillis)
        ////////////////////////////////////
    }
}.flowOn(context)
fixedRateTimer
will ensure that the time it takes to execute your task is done within the period that it’s specified to execute, in this case the 50ms
r
measureTimeMillis gives me 0ms for the trySend call
d
What does it return for the full iteration?
r
Just adding some printlns..
Copy code
val toMillis = tickConfig.interval.toMillis()
        return flow {
            var count = 0L
            var next = System.currentTimeMillis()
            while (true) {

                val start = System.currentTimeMillis()

                println("Emit ${measureTimeMillis {
                    emit(count++)
                }}")

                next = start + toMillis
                val toDelay = next - System.currentTimeMillis()
                println("To delay: $toDelay")
                println("Delay ${measureTimeMillis {
                    delay(toDelay)
                }}")
            }
        }.flowOn(newSingleThreadContext("tick"))
Copy code
Emit 0
To delay: 50
Delay 62
Delay 62
Delay 62
Emit 0
To delay: 50
Delay 62
Emit 0
Emit 0
To delay: 50
Emit 0
To delay: 50
To delay: 50
Delay 62
Delay 62
Emit 0
To delay: 50
👀 1
Sorry there is a fair few flows concurrently
d
delay
seems wildly inaccurate, but at the same time, surprisingly consistent. I'll need to take a look at this, but I'm knee-deep in
kotlinx-datetime
at the moment. Could you file an issue to
kotlinx.coroutines
, preferably with a self-contained reproducer?
r
No problem - will do - glad it wasn't just my machine
& thanks for humouring me!
k
Interestingly, this example seems to be mostly fine aside from JVM startup:
Copy code
import kotlin.time.measureTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.newSingleThreadContext

fun main() = runBlocking(newSingleThreadContext("foo")) {
    repeat(20) {
        val elapsed = measureTime {
            delay(50)
        }
        println("Supposed to delay for 50ms, delayed for $elapsed.")
    }
}
It prints the following output:
Copy code
Supposed to delay for 50ms, delayed for 60.950070ms.
Supposed to delay for 50ms, delayed for 50.307421ms.
Supposed to delay for 50ms, delayed for 50.305745ms.
Supposed to delay for 50ms, delayed for 50.352972ms.
Supposed to delay for 50ms, delayed for 50.336299ms.
Supposed to delay for 50ms, delayed for 50.306166ms.
Supposed to delay for 50ms, delayed for 50.322560ms.
Supposed to delay for 50ms, delayed for 50.307023ms.
Supposed to delay for 50ms, delayed for 50.352728ms.
Supposed to delay for 50ms, delayed for 50.386073ms.
Supposed to delay for 50ms, delayed for 50.342678ms.
Supposed to delay for 50ms, delayed for 50.378550ms.
Supposed to delay for 50ms, delayed for 50.311117ms.
Supposed to delay for 50ms, delayed for 50.314146ms.
Supposed to delay for 50ms, delayed for 50.571076ms.
Supposed to delay for 50ms, delayed for 50.340461ms.
Supposed to delay for 50ms, delayed for 50.326914ms.
Supposed to delay for 50ms, delayed for 50.338576ms.
Supposed to delay for 50ms, delayed for 50.314560ms.
Supposed to delay for 50ms, delayed for 50.352552ms.
r
I don't get the same 🙂
Copy code
Supposed to delay for 50ms, delayed for 55.788200ms.
Supposed to delay for 50ms, delayed for 50.378400ms.
Supposed to delay for 50ms, delayed for 61.915500ms.
Supposed to delay for 50ms, delayed for 60.764900ms.
Supposed to delay for 50ms, delayed for 61.419900ms.
Supposed to delay for 50ms, delayed for 61.910500ms.
Supposed to delay for 50ms, delayed for 61.554600ms.
Supposed to delay for 50ms, delayed for 62.495300ms.
Supposed to delay for 50ms, delayed for 61.706800ms.
🤔 1
k
Okay, wild. Seems like it’s all over the map, too. Mind linking the bug when you file?
And just out of curiosity what happens to you when you remove the single threaded dispatcher?
Copy code
import kotlin.time.measureTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    repeat(20) {
        val elapsed = measureTime {
            delay(50)
        }
        println("Supposed to delay for 50ms, delayed for $elapsed.")
    }
}
r
The same regardless of dispatcher
😬 1
k
Which version of kotlinx-coroutines?
r
1.8.1 and 1.10.1
👍 1
I'll be honest I'm now wondering if it's some crazy p core/e core thing or some other environmental issue
Copy code
last = System.currentTimeMillis()
    var x = 0
    val fixedRateTimer = executor.scheduleAtFixedRate(
        {
            val now = System.currentTimeMillis()
            println("Fixed2 ${now - last}")
            last = now

            if (x++ == 20) System.exit(0)
        },
        delay,
        delay,
        TimeUnit.MILLISECONDS
    )
Even this is giving me
Copy code
Fixed2 59
Fixed2 46
Fixed2 47
Fixed2 62
Fixed2 47
Fixed2 47
Fixed2 46
Fixed2 47
Fixed2 62
Fixed2 47
Fixed2 45
Fixed2 47
Fixed2 62
Fixed2 45
Fixed2 47
Fixed2 46
Fixed2 62
Fixed2 46
Fixed2 48
Fixed2 46
Fixed2 46
k
What’s
delay
in the above example?
r
50L
k
Pretty strange that it’s resuming before 50ms have elapsed
r
Yup, and now we're not in coroutine territory at all
1
k
What are your machine’s specs?
r
i7-12700K - 12 core, Win 11, temurin 17 and 21
k
And which JVM are you running on?
What OS? Kernel?
r
Yeah so genuinely the first time I've seen this but the issue is the same for Thread.sleep() so nothing to do with delay. It's kind of crazy that
scheduleAtFixedRate
runs fast sometimes and slow other times so averages out. Also confused why I don't see this issue manifest as stuttering in other applications. Some sort of issue with nanoTime vs millis?
k
This is likely a JVM/kernel scheduling thing based on the symptoms you’re experiencing.
Can you run this sample with Kotlin/Native to see if it repros the issue?
Copy code
import kotlin.time.measureTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    repeat(20) {
        val elapsed = measureTime {
            delay(50)
        }
        println("Supposed to delay for 50ms, delayed for $elapsed.")
    }
}
r
I wrote the same thing in Zig and saw the same behaviour there too so I think it's down to Windows scheduling and sleep granularity so sorry for the wild goose chase. Never noticed it being so coarse grained before.
a
What's the use case btw? Windows isn't an
Real-time OS
so you never guaranteed to get time slices at accurate intervals as well as accurate duration. There can be
interruptions
from hardware, slice duration adjustments due to internal heuristics e.g. thread priorities, other processes "usage patterns" e.g. high frequency of user inputs (aka games or GUI app) so OS may give them "special treatment", etc. There are special
Real-time OS
that are used in specific domains e.g. medical or aircraft hardware. Though you might be overengineering your task and slight inaccuracy might be unnoticed by user.
r
Wasn't for a user, just for benchmarking, was using this as a mock source to generate a tick for emitting price updates but noticed that I wasn't getting as many messages through as I expected on one machine vs another and tracked it down to the original emission of the updates. The majority of the codebase is callback or flow based so this is really the only place we used
delay
.
a
benchmarking is a hard thing with a lot of pitfalls, I would craft own solution only as a last resort, in most cases it's better to google some best practices according to tech stack in your project blob shrug
r
I am very aware, I do not think there are many solutions in my domain but thank you
a
e.g. CPU can boost frequency at load so benchmarks should do some kind of "warm up"
r
yes I know I am aware of JMH etc.
This isn't microbenchmarking, this is a full end to end pre-prod benchmark of a trading system where there are many network boundaries, data sources and Flow conflation etc. - it is often difficult to determine the source of dropped messages, especially since Flow doesn't really expose much of internals to instrument and become aware of e.g. slow consumers.
😅 1
Unfortunately in this case it took tracking it all the way to the source to just determine that the delay needed replacing with a spin loop due to a difference on Windows to its normal environment, Linux
a
I believe you can tweak time slice amount/frequency in unix-like OS, unlike Windows
r
yeah ~20ms slice seems like an eon!
the fact that
scheduleAtFixedRate
seems to average it out (sometimes less than 50ms) threw me off even further
d
I wonder how
fixedRateTimer
ensures the correct timing if even native code fails to do that. Maybe we could utilize the same technique in our
delay
implementation.
r
It seemed to just have some below and some above - I'm not sure if that's possible with
delay
Copy code
Fixed2 47
Fixed2 46
Fixed2 47
Fixed2 62
Fixed2 47
Fixed2 45
However I surfaced https://github.com/Kotlin/kotlinx.coroutines/issues/3680 - I think it would be a useful addition (or option) if there is any new API added
d
Please don't share snippets of JDK sources, we're not allowed to look at them, as with the license we are using (Apache 2.0), we can not use GPL code.
It seemed to just have some below and some above
Oh, that's neat. So, they must be keeping track of the rate of the previous tasks and compensate for too slow (too fast) rates by adjusting the later ones.
a
Basically it's a looper with
System.currentTimeMillis()
checks
Copy code
// java/util/Timer.java

private void mainLoop() {
looks like
.wait(executionTime - currentTime)
is the key, though it still depends on the OS scheduler to awake thread accurately 🤔