reactormonk
05/22/2024, 1:25 PMThread
so far, but the Exception handling is atrociousreactormonk
05/22/2024, 1:26 PMSam
05/22/2024, 1:26 PMreactormonk
05/22/2024, 1:26 PMreactormonk
05/22/2024, 1:26 PMSam
05/22/2024, 1:28 PMreactormonk
05/22/2024, 1:28 PMsuspend fun createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer) {
val channel = connection.createChannel()
val rpcServer = block(channel)
val thread = Thread {
rpcServer.mainloop()
}
try {
thread.start()
} catch (e: Exception) {
Timber.e(e)
}
connection.addShutdownListener {
rpcServer.close()
thread.interrupt()
}
}
reactormonk
05/22/2024, 1:30 PMuli
05/22/2024, 1:35 PMreactormonk
05/22/2024, 1:35 PMSam
05/22/2024, 1:36 PMnewSingleThreadContext()
, which will make a dispatcher with its own thread.reactormonk
05/22/2024, 1:36 PM<http://Dispatchers.IO|Dispatchers.IO>
, I was initially reluctant due to thread hugginguli
05/22/2024, 1:36 PMSam
05/22/2024, 1:37 PMrunInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
rpcServer.mainLoop()
}
Sam
05/22/2024, 1:37 PMreactormonk
05/22/2024, 1:38 PMsuspend fun createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer) {
val channel = connection.createChannel()
val rpcServer = block(channel)
val serverJob = Job()
runInterruptible(<http://Dispatchers.IO|Dispatchers.IO> + serverJob) {
rpcServer.mainloop()
}
connection.addShutdownListener {
rpcServer.close()
serverJob.cancel()
}
}
Something like this?Sam
05/22/2024, 1:41 PMJob
like that is generally not a good idea. In this case it means the mainLoop() won't react to cancellation any more.reactormonk
05/22/2024, 1:44 PMSam
05/22/2024, 1:44 PMcoroutineScope {
val serverJob = launch {
runInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
rpcServer.mainloop()
}
}
connection.addShutdownListener {
rpcServer.close()
serverJob.cancel()
}
}
reactormonk
05/22/2024, 1:45 PMcoroutineScope
to acquire the contextreactormonk
05/22/2024, 1:45 PMuli
05/22/2024, 2:05 PMfun createRPCServer
would return after starting the service. coroutineScope {}
will suspend until the launched coroutine terminates. You might want to create a new scope to launch the service inSam
05/22/2024, 2:06 PMThread { … }
didn't actually include a way to wait for the mainLoop()
to finish.Sam
05/22/2024, 2:06 PMuli
05/22/2024, 2:08 PMcreateRPCServer
would also not need to be suspend
.
So either provide your own scope or take a scope as receiver:
fun CoroutineScope.createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer): Job {
val serverJob = launch {
runInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
rpcServer.mainloop()
}
}
connection.addShutdownListener {
rpcServer.close()
serverJob.cancel()
}
return serverJob
}
Zach Klippenstein (he/him) [MOD]
05/22/2024, 5:23 PMuli
05/22/2024, 6:46 PMnewSingleThreadContext
uli
05/22/2024, 6:48 PMOliver.O
05/27/2024, 12:34 PMThe IO dispatcher [...] has a maximum number of threads it will allocate, so theoretically holding one or more of these forever could limit normal users of the dispatcher.Using
Dispatchers.IO.limitedParallelism(...)
makes these problems disappear. For details and references, see KTOR-6462.reactormonk
05/27/2024, 4:45 PM