Hi guys, I have a custom dispatcher that looks rou...
# coroutines
n
Hi guys, I have a custom dispatcher that looks roughly like this
Copy code
class CustomDispatcher(private val parent: CoroutineDispatcher) : CoroutineDispatcher() {

    // needs to be true so that dispatch is called
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = true 

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // do some stuff, then
        if (parent.isDispatchNeeded(context)) parent.dispatch(context, block)
        else block.run()
    }
}
• Is this safe? Calling block.run() synchronously, asking for
isDispatchNeeded
during `dispatch`… • Should I do something about
dispatchYield
? I’m asking because some dispatchers (Unconfined…) seem to communicate with the rest of the coroutine machinery in special ways which I might be breaking.
s
🙅
This method must not immediately call
block
. Doing so may result in StackOverflowError when dispatch is invoked repeatedly, for example when
yield
is called in a loop. In order to execute a block in place, it is required to return false from
isDispatchNeeded
and delegate the dispatch implementation to Dispatchers.Unconfined.dispatch in such cases. To support this, the coroutines machinery ensures in-place execution and forms an event-loop to avoid unbound recursion.
That’s from the kdoc comment on the
dispatch
method
n
it is required to return false from
isDispatchNeeded
and delegate the dispatch implementation to Dispatchers.Unconfined.dispatch in such cases.
Thanks! But if I return false from
isDispatchNeeed
, I won’t even get the dispatch call so can’t delegate to unconfined. Maybe it means to return false as the preferred solution or, if not possible, do Unconfined.dispatch?
No - I get the error
Dispatchers.Unconfined.dispatch function can only be used by the yield function. If you wrap Unconfined dispatcher in your code, make sure you properly delegate isDispatchNeeded and dispatch calls
.
s
I think when you return
false
from
isDispatchNeeded
, the continuation automatically runs unconfined without you needing to do anything else. Maybe you just need to have your
isDispatchNeeded
function delegate to
parent.isDispatchNeeded
?
n
Yeah that’s correct, but I want to receive
dispatch()
calls to run some code there (removed in my snipped). I’m starting to think that it’s not possible
s
Do you just need to run some code before the coroutine gets resumed? Would it be easier to make an interceptor that wraps the continuation, rather than trying to add code to the dispatch invocation? Seems you’re right, by the time it gets to calling
dispatch
it might already be too late.
n
I have never used interceptors, but looks like it would not suffice... I need to run some code before
block
starts and after it ends, so I’m wrapping it in another
Runnable
s
Maybe something like this?
Copy code
class CustomInterceptor(private val delegate: CoroutineDispatcher): ContinuationInterceptor {
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor.Key

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        return delegate.interceptContinuation(object: Continuation<T> {
            override val context: CoroutineContext = continuation.context

            override fun resumeWith(result: Result<T>) {
                // do stuff before
                continuation.resumeWith(result)
                // do stuff after
            }
        })
    }
}
The
Runnable
that gets passed to the dispatcher is actually the same
DispatchedContinuation
object that is returned by
dispatcher.interceptContinuation
blob think smart. So by intercepting it before passing it to the dispatcher, the before/after code added here ends up inside that
Runnable
.
n
I’m a bit confused 😄 in my understanding resumeWith will schedule work on the dispatcher, so there might be a thread jump there and the “after” might be too soon. I’m probably wrong though, got to try. Thanks for the hints!
I guess it depends on who wraps who, since dispatcher is also an interceptor instance
s
The continuation that you get here is going be the unintercepted one, which means that
resumeWith
runs the continuation directly. The behaviour that you described, where
resumeWith
schedules work on the dispatcher, is the behaviour of the continuation after it has been intercepted by the dispatcher.
If you did it the other way round, you’d get the thread jump that you described, and the “after” code could run too soon
Copy code
class BrokenCustomInterceptor(private val delegate: CoroutineDispatcher): ContinuationInterceptor {
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor.Key

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        val intercepted = delegate.interceptContinuation(continuation)
        return object: Continuation<T> {
            override val context: CoroutineContext = intercepted.context

            override fun resumeWith(result: Result<T>) {
                // do stuff before scheduling the task
                intercepted.resumeWith(result)
                // do stuff after scheduling, but maybe before actually running it!
            }
        }
    }
}
🙅 ☝️ don’t do this 😄
n
Nice, that’s smart! Thanks, I understand things better now 🙏