How can I fire off a mainloop to a separate thread...
# coroutines
r
How can I fire off a mainloop to a separate thread? I've been using oldschool
Thread
so far, but the Exception handling is atrocious
Ideally, I'd rewrite the underlying library to use proper coroutines, but alas
s
Can you clarify a bit? What do you mean by mainloop? Are we talking about Android?
r
Kinda, running the mainloop for a rabbitmq service
But yeah, the whole system runs on android
s
Can you share an example of what you tried with threads? It shouldn't be too hard to coroutinify 🤞
r
Copy code
suspend fun createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer) {
            val channel = connection.createChannel()
            val rpcServer = block(channel)

            val thread = Thread {
                rpcServer.mainloop()
            }
            try {
                thread.start()
            } catch (e: Exception) {
                Timber.e(e)
            }
            connection.addShutdownListener {
                rpcServer.close()
                thread.interrupt()
            }
        }
The try/catch I added recently, after the Exception from the Thread ended up in Timbuktu
u
Did you try launching a coroutine on Dispatchers.IO for the rpcServer.mainloop()?
r
Is that Dispatcher made for hugging threads?
s
Dispatchers.IO could work well 👍. Or, if you want a dedicated thread, you can use
newSingleThreadContext()
, which will make a dispatcher with its own thread.
r
Ok, I have no issues shoving it over to
<http://Dispatchers.IO|Dispatchers.IO>
, I was initially reluctant due to thread hugging
u
it is made for things mostly waiting on IO. As rpcServer.mainloop will not supsend. It also keeps running on the same thread for it’s life time
s
👍 try this:
Copy code
runInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
  rpcServer.mainLoop()
}
It'll be cancellable, and it'll propagate exceptions correctly
r
Copy code
suspend fun createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer) {
            val channel = connection.createChannel()
            val rpcServer = block(channel)

            val serverJob = Job()
            runInterruptible(<http://Dispatchers.IO|Dispatchers.IO> + serverJob) {
                rpcServer.mainloop()
            }
            connection.addShutdownListener {
                rpcServer.close()
                serverJob.cancel()
            }
        }
Something like this?
s
Passing a custom
Job
like that is generally not a good idea. In this case it means the mainLoop() won't react to cancellation any more.
r
How can I create a job that's still connected to the parent context, aka doesn't override it?
s
Copy code
coroutineScope {
  val serverJob = launch {
    runInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
      rpcServer.mainloop()
    }
  }

  connection.addShutdownListener {
    rpcServer.close()
    serverJob.cancel()
  }
}
r
Ah, yeah, forgot about
coroutineScope
to acquire the context
Thanks for the help
👍 1
🐕 1
u
Well, the original
fun createRPCServer
would return after starting the service.
coroutineScope {}
will suspend until the launched coroutine terminates. You might want to create a new scope to launch the service in
s
That's a good spot, the original code with the
Thread { … }
didn't actually include a way to wait for the
mainLoop()
to finish.
I had not noticed that.
u
If you launch on a dedicated scope,
createRPCServer
would also not need to be
suspend
. So either provide your own scope or take a scope as receiver:
Copy code
fun CoroutineScope.createRPCServer(block: suspend com.rabbitmq.client.Channel.() -> RpcServer): Job {
  val serverJob = launch {
    runInterruptible(<http://Dispatchers.IO|Dispatchers.IO>) {
      rpcServer.mainloop()
    }
  }

  connection.addShutdownListener {
    rpcServer.close()
    serverJob.cancel()
  }

  return serverJob
}
👍 1
z
IO dispatcher is probably fine for running this sort of thing on Android, but it’s probably not a good idea in general. The IO dispatcher is tuned for tasks that hold threads while doing IO operations for some time, but not for the entire life of the process. It has a maximum number of threads it will allocate, so theoretically holding one or more of these forever could limit normal users of the dispatcher. But on Android, I think you’re extremely unlikely to get close to that limit anyway, so it probably doesn’t matter.
u
Yes if this is a concern go with
newSingleThreadContext
Be aware though that this has to be cleaned up if no longer used
o
The IO dispatcher [...] has a maximum number of threads it will allocate, so theoretically holding one or more of these forever could limit normal users of the dispatcher.
Using
Dispatchers.IO.limitedParallelism(...)
makes these problems disappear. For details and references, see KTOR-6462.
👍🏻 1
r
Probably would be best to rewrite everything using coroutines and no more mainloop, but that'll take some time 😭