Broken assumption: `yield()` on `Main.immediate` w...
# coroutines
s
Broken assumption:
yield()
on
Main.immediate
will behave like Unconfined and empty the current event loop or continue undispatched if its empty. Discovered result:
yield()
for
Main.immediate
on Android always ends up getting a main thread message posted. Brief investigation: HandlerContext has no
dispatchYield()
implementation, which means that it will default to just
dispatch
(here). Which means that
dispatcherWasUnconfined
will not get set and we will never
yieldUndispatched()
for
yield()
(here).
Copy code
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    context.ensureActive()
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (cont.dispatcher.safeIsDispatchNeeded(context)) {
        // this is a regular dispatcher -- do simple dispatchYield
        cont.dispatchYield(context, Unit)
    } else {
        // This is either an "immediate" dispatcher or the Unconfined dispatcher
        // This code detects the Unconfined dispatcher even if it was wrapped into another dispatcher
        val yieldContext = YieldContext()
        cont.dispatchYield(context + yieldContext, Unit)
        // Special case for the unconfined dispatcher that can yield only in existing unconfined loop
        if (yieldContext.dispatcherWasUnconfined) {
            // Means that the Unconfined dispatcher got the call, but did not do anything.
            // See also code of "Unconfined.dispatch" function.
            return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
        }
        // Otherwise, it was some other dispatcher that successfully dispatched the coroutine
    }
    COROUTINE_SUSPENDED
}
Initial question: Why is that? Is it a bug? Should not
yield()
have the same behavior for
Main.immediate
and any other dispatcher using an event-loop?
z
well this answers that question i had about the same behavior for compose-based workflows 😁
d
The documentation for
yield
doesn't describe the exact behavior, so this is just unspecified: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html This means that we can potentially change this behavior if we think it's more reasonable that way. I think that
yield
properly scheduling a task is the more useful behavior. Imagine that you observe liveness issues due to a long-running operation; then, you stick a
yield()
in the loop, and now the other tasks scheduled on the main thread also get a chance to run. If we go for consistency with
Unconfined
instead, then a
yield()
should do nothing at all if there is just one coroutine running on
Dispatchers.Main.immediate
. Maybe a hybrid solution could work, where a
yield
prioritizes the event loop tasks, but reschedules the task if there are none left. Then, the question becomes: why even use
yield
in an event loop? Is this an instance of the "wait for something to happen" pitfall mentioned in the documentation? This usage of
yield
is not something we want to encourage.
z
I wonder if yielding to the main thread would be ok for Stephen’s and my use case (we work together) if it guaranteed the resumption would happen before the next frame (eg like Compose’s dispatcher)? I’m not sure if that would be possible to do, or even desirable generally. I think the thing we’re trying to use it for here is pretty specific and weird, and maybe even smelly. Maybe we just need to write our own dispatcher (although that’s messy too, since it would need to implement
Delay
which is internal).
d
It sounds like, semantically, you want to appropriate a thread inside a coroutine without allowing other coroutines to use it. If so, would
runBlocking
solve the problem? That's what it was made to do, after all, block the thread.
s
Maybe a hybrid solution could work, where a
yield
prioritizes the event loop tasks, but reschedules the task if there are none left.
I think that is what I'm looking for ^^
Then, the question becomes: why even use
yield
in an event loop?
This is not waiting for something to happen - this is allowing liveness within the immediate event loop first before allowign it within the Main thread's event loop
Our use case is that we need to introduce liveness within the 'immediate' event loop during a tight runtime loop
I do think that having it post on main thread if there are no immediate event loop tasks makes a tonne of sense
or actually, why?
it can do that at a later suspension point that is not a
yield()
It sounds like, semantically, you want to appropriate a thread inside a coroutine without allowing other coroutines to use it.
I mean
Main.immediate
allows you to do this already, you just can't ever
yield()
, so you can't provide liveness guarantees / modify ordering within that immediate event loop
taking a step back though, we could look at solving our use case in a different way. we have a tight loop of work on
immediate
but we need to give fairness/liveness to other continuations suspended on
immediate
- what do you think is the best way to do that?
d
modify ordering
This is not a supported use case for
yield
. In
kotlinx.coroutines
, unless it's documented, we do not guarantee the exact number of suspensions, so the ordering may change arbitrarily across releases.
liveness guarantees
The whole event loop is one big task, and as far as I understand the typical UI architectures, a screen refresh can't happen while that big task executes. I don't understand what kind of liveness you are talking about: liveness is typically relevant for multithreading.
we need to give fairness/liveness to other continuations suspended
Could you provide an example of when that would be needed?
s
yes, sorry, we're probably using terms slightly differently. I will try to explain better
a screen refresh can't happen while that big task executes
Correct on Android (this case), but we use that fact
>> we need to give fairness/liveness to other continuations suspended > Could you provide an example of when that would be needed? Example: 4 flow collectors are suspended on
Main.immediate
and each of their collectors sends to a channel. There is a separate loop consuming off that channel and it wants to empty it before continuing on. Without
yield()
it will remove one from the channel, then continuing on processing it etc. etc. But i want to drain the channel in that loop before continuing on. To do so I need to let the other 3 flow collectors resume in order to send to the channel
I can't check if the channel has elements because they aren't their yet, their flow collectors are suspended
(its a bit more complicated than what I described as actually we have several channels and a
select
that resumes from one of them)
d
But i want to drain the channel in that loop before continuing on.
Why is it a problem if they only get to send their elements after you process the first element?
s
After processing, we go on to operation B() ("rendering" within this framework). I don't want operation B() to happen until all elements have been processed, but it all happens immediately so there is no way to let that happen without yielding (that I know of yet :D)
sorry, I need to introduce one mroe element
we are not suspending waiting for something on the channel
we are using a
select
with a final
onTimeout(0)
clause so we only process what is immediately available
so if I don't let those other flow collectors send their elements they will miss that processing and let operation B() (the expensive one I want to happen only once) go ahead
so, i guess, in a sense I am enforcing ordering which you note (and the documentation notes) is an unsupported use case
d
The same will happen even with the
yield
if the collectors suspend at some point. This is quite brittle.
s
I wonder how I can do that then?
The same will happen even with the
yield
if the collectors suspend at some point. This is quite brittle.
What do you mean?
d
I mean that if a collector suspends, even if the coroutine reading from the channel calls
yield
, the collector may still not get a chance to put its element into the channel, because by suspending, it will miss its chance to run.
s
well, it won't suspend because I control it 😄
d
I've looked at the source code, but it's difficult for me to understand what's going on without diving deep.
s
(totally valid btw! It's not straightforward)
👍 1
It really comes back to it was a surprise to me that
Main.immediate
would not drain the inner event loop before dispatching a new task as my mental model was that it would behave like the Unconfined event loop (given the docs say they share this implementation). TBH I tried lookign through the source for that but it did not really look like they shared the event loop implementation?
Either I did not find it yet or we should update the docs to remove that comparison
d
They do share the event loop implementation, it's activated by returning
false
from
isDispatchNeeded
.
Is it always the case that all 4 collectors put something into the channel?
s
yes, but
false
is returned here for
isDispatchNeeded
d
Yes, it is. What's the issue?
s
Oh sorry I am saying that
Main.immediate
returns false here for
isDispatchNeeded
so it should have the same behavior (or at least I thought) but it does not implement
dispatchYield()
like Unconfined does
Is it always the case that all 4 collectors put something into the channel?
No, its not always the case. that's why we
onTimeout(0)
in the select. if there is nothing that needs to be processed we continue on immediately
I get that
Main.immediate
needs to make a choice.
Imagine that you observe liveness issues due to a long-running operation; then, you stick a
yield()
in the loop, and now the other tasks scheduled on the main thread also get a chance to run. If we go for consistency with
Unconfined
instead, then a
yield()
should do nothing at all if there is just one coroutine running on
Dispatchers.Main.immediate
.
Wrt to this, if I was seeing a liveness issue for other main thread tasks from a long running operation on Main.immediate then I would either stop usign Main.immediate or fix the long running operation
I want to take advantage of exactly the behaviour you describe here
namely:
hen a
yield()
should do nothing at all if there is just one coroutine running on
Dispatchers.Main.immediate
can i check for continuations waiting for dispatch from the context?
d
yield()
should do nothing at all if there is just one coroutine running on
Dispatchers.Main.immediate
That's the opposite of what is usually expected from a `yield`: it has to ensure fairness across unrelated tasks. In that case, yes, it looks like your use case warrants using
runBlocking
from a coroutine. What's usually cited as the issues of
runBlocking
is exactly the behavior that you seem to want.
can i check for continuations waiting for dispatch from the context?
Not using the public API.
😢 1
s
That's the opposite of what is usually expected from a `yield`: it has to ensure fairness across unrelated tasks.
I mean depends on your frame of reference right? For
Unconfined
it makes sense I would expect it to drain the unconfined event loop. If immediate dispatchers are like Unconfined then I thought the same
Can you explain how I would use
runBlocking
with pseudo code? I'm not understanding
if its for the flow collectors I don't have access to them directly in the runtime loop where I'm makign this decision
d
How do you ensure that they in the same event loop?
s
i control that they are all collecting on the same dispatcher
d
It's not enough, though. Simple example:
Copy code
withContext(Dispatchers.Main.immediate) {
  withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    GlobalScope.launch(Dispatchers.Main.immediate) {
      println("1")
    }
  }
  println("2")
}
The computation
1
will be scheduled on the
Main
dispatcher as if it was not
.immediate
. So, it won't enter the same event loop as
2
.
For several computations to enter the same event loop, they have to be created from inside an active event loop, exactly from the thread running the loop.
s
interesting - I will have to think on that and double check if the collection can switch context without the runtime knowing
d
In any case, with
runBlocking
, it would be something like this:
Copy code
val channel = Channel<Int>(5)
runBlocking(Dispatchers.Main.immediate) {
  repeat(5) {
    if (Random.nextBoolean()) {
      launch {
        channel.send(it)
      }
    }
  }
}
val elements = channel.toList() // or `tryReceive` in `while (true)` to avoid closing the channel
// process the elements
If you can not structure your code in a way that computations are started in the same call frame, then it's likely that they also don't share the same event loop, which means there's not much that can be done.
s
got it. Ok, thank-you. I will reflect on this