Hi, I have audio processing usecase - and I think ...
# ktor
u
Hi, I have audio processing usecase - and I think I want your advice, because building it (high performance) on top of coroutines seems wrong (?) Basic idea is there is a producer (websocket streaming audio from client) and a consumer and there is a ring buffer between them. The usual (and blocking) standard way of solving this. What do you think about spawning threads in ktor server? Is there a way to make it atleast play nice with shutdown? // Channel mostly want me to use immutable data which is obviously no go for lots of binary data, so I'm not sure - communicating via a mutable structure seems like a bad idea right? Ideas?
s
I see no reason why you couldn't use a channel to push packets to? split the audio into packets and push them to the channel and then have a consumer reading from the channel and you can just spawn coroutines with
launch { }
instead of threads. and if spawning them on
<http://Dispatchers.IO|Dispatchers.IO>
, then each consumer & producer will very likely have its own dedicated OS thread. this is because
<http://Dispatchers.IO|Dispatchers.IO>
will either use 64 threads or the number of available processors, whichever is greater. (unless you override it with
-<http://Dkotlinx.coroutines.io|Dkotlinx.coroutines.io>.parallelism=...
) you probably want some kind of pool of bytebufs or smth like what netty uses, and then return the bytebuf to the pool at the end (see:
PooledByteBufAllocator
, you can also use the "default bytebuf allocator" by using
ByteBufAllocator.DEFAULT
). but once the bytebuf gets passed to the channel, and until it gets freed again, treat it as fully immutable. when you're done with a bytebuf, you just call
.release()
on it to return it back to the pool. you can also have netty warn you about any unreleased bytebufs. look at
ResourceLeakDetector
for how to enable that. also, I've personally found the shutdown to be quite finicky in fact, and cause issues where sometimes it shuts down when it shouldn't or doesn't start up things properly when it should when in developer mode & using
Copy code
application.monitor.subscribe(ApplicationStopping) {
    // ...
}
to shutdown an exposed database. my workaround has just been to not even subscribe to that hook when in developer mode and not use a shutdown hook at all lol there are two different ways you can listen for shutdowns in ktor: • subscribing to the monitor I posted above • add a shutdown hook (jvm only). if you want to add a jvm shutdown hook, I have this small snippet of code that I generally copy around my different projects bc shutdown hooks are quite a common thing (consider it as dual-licensed under CC0 or the Unlicense. take your pick of whichever you prefer)
Copy code
private val shutdownHooks: MutableList<() -> Unit> = Collections.synchronizedList(mutableListOf())
private lateinit var shutdownHook: Thread

fun onJvmShutdown(block: () -> Unit) {
    shutdownHooks += block
    if (!::shutdownHook.isInitialized) {
        shutdownHook = ShutdownHook("JVM-Shutdown-Thread", block)
        Runtime.addShutdownHook(shutdownHook)
    }
}

private class ShutdownHook(name: String, private val stopFunction: () -> Unit) : Thread(name) {
    private val shouldStop = AtomicBoolean(true)

    override fun run() {
        if (shouldStop.compareAndSet(true, false)) {
            stopFunction()
        }

        Runtime.removeShutdownHook(this)    
    }
}

val Runtime: Runtime
    get() = JvmRuntime.getRuntime()
I just usually throw it in a file named
JVMUtil
or something.
u
well, my key thing is that produced chunks are different size to the consumer ones; and if I were to use Channel that means i'd need another mini buffer at consumer and fill it there so thats a ring buffer anyways just not standard
-- but in general, won't I take unnecesary perf hit using coroutines for this? -- if I know I'll always use just 2 threads (1 producer, 1 consumer)
people working hard on making the model which processes the audio as fast as possible and then blow those gains on coroutine communication..idk
s
-- but in general, won't I take unnecesary perf hit using coroutines for this? -- if I know I'll always use just 2 threads (1 producer, 1 consumer)
I cannot say for sure, however my guess would be that no you wouldn't but, characterising the performance of things is extremely difficult and the best thing I can recommend is https://tryitands.ee the only real way to know if something like that will perform badly is to try both ways and benchmark them.
u
I know but, design wise, if I could only rely just on channels than yes I'd maybe make that trade off but here I cannot, I need to manually queue stuff anyways, so what'đ the upside ..
unless im missing something
s
can the channel not do the queuing for you? you can use a channel like a ring buffer by doing
Copy code
val channel = Channel(128 /* replace with appropriate buffer size */, BufferOverflow.DROP_OLDEST)
the advantage is that you get to use apis that feel a bit nicer to use in kotlin. sure, the native jvm apis might have a slight advantage in performance (but that would need to be tested of course, I can't say that for sure and kotlin's apis could easily be faster), but the kotlin apis are easier to use and imo easier to iterate with. so I'm willing to have the chance that I'm missing out on a bit of performance if it means I can use smth that's a bit easier
u
well inputs are 4kB long and model ingests 10kB how would you have the consumer side collect 2.5 buffers?
(I also dont want to mess with java & writing my own ring buffer and all that crap)
s
not sure, you could always do some nasty like double-channel type thing where producers send to the first channel then you have a coroutine which consumes data from that channel & groups the 4kb chunks into 10kb chunks, then pushes it to the second channel (using smth mutable to accumulate the chunks, I guess?) then, any consumers would consume from the second channel it doesn't sound fun to write, but that would be my first instinct on how to write it probably good to throw in like some mutexes or locks as well or smth, idk
u
yea but then its not elegant, verbose & coroutine perf hit, so what am I gaining
also model inference is synchronous, because its basically C
s
the nicer kotlin api and, you can't say for sure if it's actually a performance hit until you go and measure it
u
im not sure its nicer since the 4 -> 10 transform will require manual buffering = ring buffer
so you're saying ktor shutdown mechanism isnt reliable enough for it to break my while loop if I were to use raw threads?
s
in my experience it's not that it never stopped it but rather sometimes after the code was hotswapped, the database did not start up again for like maybe 15-30 seconds could it have been some dumb error caused entirely by me? probably. but I just never bothered to debug it further and just disabled it in dev
u
Copy code
application.monitor.subscribe(ApplicationStopping) {
    breakLoop = true
}
so simply just this?
s
something like that? you could just stop the thread entirely tbh
u
uhm..you can stop a thread?
s
yes? you call
Copy code
Thread.currentThread().interrupt();
and then make sure to catch
InterruptedException
& check for
Thread.currentThread().isInterrupted()
u
well yea thats the condition, but sure
thanks