So I've got this issue for Jvm & Native regard...
# coroutines
m
So I've got this issue for Jvm & Native regarding threading and I'm not quite sure how best to resolve it with a concise API for my library consumers. I have a class using several different BG threads (process stdio streams, network events off a socket, etc). Events are then dispatched to all registered observers (non-suspending callbacks). All the code in my implementation is thread-safe, but it means that all registered observers must also be thread-safe implementations, too. What would you do in this situation?
My thoughts currently are to have 2 callback types for the observers, the default that will be invoked using
Dispatchers.Main
(preferring
Dispatchers.Main.immediate
if available), and another instance of it that will not do any context switching (direct dispatch):
Copy code
// Callback that events are dispatched to using
// Dispatchers.Main
fun interface OnEvent<in T: Any> {
    operator fun invoke(event: T)

    // Callback that events are dispatched to using
    // whatever thread that event originated on.
    // Implementation **MUST** be thread-safe.
    fun interface Unconfined<in T: Any>: OnEvent<T>
}
But what if
Dispatchers.Main
is not available (e.g. Someone using this with
ktor-server
, or in headless mode, or native linux/mingw)???
s
1. What do you mean by "thread-safe"? 2. It seems odd to be talking about coroutine dispatchers when you don't seem to be using coroutines. If you have kotlinx.coroutines available, why not use a
callbackFlow
and let the caller choose their dispatcher? That's what dispatchers are for, after all.
m
I'm using coroutines under the hood, and making asynchronous APIs available, but the public API has no references to
kotlinx.corutines
👍 1
s
I think people are probably used to these kinds of limitations with callbacks/observers. It's one of the problems that coroutines were created to solve. It wouldn't be unreasonable to put a note in your docs saying "callback implementations should be fast and follow xyz rules. Callers are recommended to use a callbackFlow or other reactive programming technique to run more complex code in response to events."
But maybe I'm misunderstanding the problem. Can you elaborate on what you meant by "all registered observers must be thread-safe?"
m
Yeah, so there are multiple different events that are produced in different thread contexts. When that event occurs (for example,
LOG.DEBUG
), it is dispatched to all observers registered for that event. Implementors of that
OnEvent
callback must have thread safety (e.g. using
synchrnoized
and/or
@Volatile
) when interacting with anything outside of the
OnEvent
lambda (such as global variables in a class or something)
s
I don't think that would be too unexpected. As a long-time JVM programmer, I automatically expect asynchronous callbacks to run in whatever thread they feel like. Some Android libraries (I'm thinking of Retrofit) do go to special effort to have their callbacks run on the main thread, but it's certainly not something I expect or take for granted. From my point of view, single-threaded event loops like the ones in JavaScript or Android are the exception, not the rule.
But what if
Dispatchers.Main
is not available (e.g. Someone using this with
ktor-server
) […]?
These folks should already know they're running in a multithreaded environment, and they'll also be used to the idea of adapting callbacks to suspending functions and flows. A coroutine dispatcher gives the caller the ability to adapt your callback into a suspension point and choose what thread their code will run on when the suspension point resumes. That means they have control of their thread choice, making it less of a worry for you.
m
For sure for sure. I'm working on a
2.0.0
version of a current library where, previously,
Dispatchers.Main
was utilized for all events. So sort of need to maintain that default functionality, sadly.
s
Okay, I follow! That's an interesting problem. I think I'd lean towards requiring callers to pass an extra parameter to control thread choice when registering an observer.
Copy code
fun registerObserver(observer: MyObserver, executor: CallbackExecutor)

fun interface CallbackExecutor {
  fun execute(callback: () -> Unit)
}
Then you can provide a couple of built-ins:
Copy code
object MainExecutor: CallbackExecutor {
  override fun execute(callback: () -> Unit) = Dispatchers.Main.dispatch(...)
}

object ImmediateExecutor: CallbackExecutor {
  override fun execute(callback: () -> Unit) = callback()
}
Technically the extra indirection doesn't enable anything the user wouldn't be able to do with a plain callback—but it does remove the ambiguity and assumptions. You could provide the old implementation as a (possibly-deprecated) overload, for compatibility:
Copy code
@Deprecated
fun registerObserver(observer: MyObserver) = registerObserver(observer, MainExecutor)
I like it as an approach because it makes everything explicit—the user knows they're making a choice, and can make an informed one based on their own threading needs and assumptions.
🙌 1
❤️ 1
m
👀 I love it...
🎉 1
🐕 1
Update: I implemented this and it works beautifully, really makes the API flexible for how someone will use this, and concise for consumers. Have it setup to declare a "default" executor on main class instantiation, and also able to declare individual per-observer executor if needed to override that default when callbacks get invoked for the event.
s
That's great, good to hear it's working!