what business logic requires it?
p
what business logic requires it?
d
I need somehow pass the Zipkin/Brave distributed logging context. It would be nice to pass Slf4J MDC too.  KotlinX supports such ThreadLocal management - https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-slf4j/src/MDCContext.kt
Our project is based on SpringBoot & Arrow. Current case is that message is received from Kafka by Spring Cloud than it is send to IO based logic and finally message goes to Kafka via Spring Cloud again.
s
If you use suspendCancellable() and use the library you mentioned and add MDCContext() job along with you main coroutine it should work effortlessly.
@simon.vergauwen helped me some days back with the same problem. I’ll try to find the thread
d
I tried to use KotlinX dispatchers & they work fine - like
IO.effect(<http://Dispatchers.IO|Dispatchers.IO> + MDCContext())
But I’m not sure if it is allowed/safe to use KotlinX dispatchers with Arrow’s IO.
s
I think it should be safe as long as you are using
arrow-fx-kotlinx-coroutines
. even if you aren’t, I don’t see any reason why it wouldn’t be. But take it with sack (not grain) of salt
p
so it’s an SLF4J property that stores some Thread-local state in some context
so the logger reports the traces and their corresponding threads
or is it to make stacktraces work across thread jumps?
s
so it’s an SLF4J property that stores some Thread-local state in some context, so the logger reports the traces and their corresponding threads
Yes. That’s what I have understood from the implementation.
p
@dnowak your proposal sounds reasonable. The kx.coroutines dispatchers are optimized for their own use, still they're reusable for us because at the bottom of it is just a ContinuationInterceptor that'll do the MDCContext and other thread-changing stuff
you'll need the integration module if you'd like cancelation to be compatible, as @Satyam Agarwal has suggested
d
Looks like I need the TracingContext only when IO is run on other thread then the calling one. We have only a few places like that. Otherwise everything works fine as the thread local is already initialized.
@pakoito What is the advised way of running IO in current thread?
p
jump-back-to-the-thread-that-created-io, or what in Rx was
Schedulers.immediate()
. I believe we don’t have it, you have to jump back to a specific scheduler.
make a scheduler off a Threadpool with a single thread, that’s your Main Thread now. And run everything synchronous there.
If you want to not jump to any new Scheduler, use
EmptyCoroutineContext
. It’ll still trampoline where needed when using suspended and unsafeRunAsync.
d
@pakoito Thank you for all the details 🙂.
s
@pakoito we should be able to easily add
immediate
, as well as
single
immediate
would just be exposing our
trampoline
scheduler afaik.
p
immediate should stay on the current thread or equivalent, not the one that the execution is at that point. That required installing something on a Thread IIRC
s
should stay on the current thread or equivalent, not the one that the execution is at that point
🤔
So the same as trampolining
IO
, right?
p
so
Copy code
// called from "Main"

IO(dispatchers().io()) { 1 } // IO
  .continueOn(dispatchers().immediate())
  .map { Thread.current().name } // should be "Main"
  .suspended()
if it fulfills that then yes
s
And the reason for
main
is that
immediate()
was called there?
Or because
suspended()
was called there?
p
immediate
s
Yes, we have that already. That’s function retrieves an immediate dispatcher attached to a
ThreadLocal
val.
p
perfect
so when people ask, what dispatcher/function should I point them to?