```val flow = MutableSharedFlow<Int>() val j...
# coroutines
p
Copy code
val flow = MutableSharedFlow<Int>()
val job = CoroutineScope(Job()).launch {
  flow.collect { println(it) }
}

suspend fun main() {
  yield()
  Thread.sleep(1000)
  flow.emit(1)
  yield()
  Thread.sleep(1000)
  job.cancelAndJoin()
}
why does collector get cancelled before receiving the emission in above example? In order to avoid race conditions, I put some yields and sleeps to make sure first collector appears before the emission, and also to make sure collector has time to collect after the emission. Still never happens It only happens with
delay(1)
after emission
I would expect that since collector is running on a different thread pool, above example would work
r
yield
only affects the current thread or thread pool, so I am guessing your main function and coroutinescope aren't using the same dispatcher. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
p
Ok, but yield should not even be necessary in above example in my view. If main thread sleeps for enough time after emitting the value, why can’t collector on a worker thread receive it?
From debugger, by the time collector receives “1”, the job is already cancelled The only way I could force collector to receive it is by using delay(1) for some reason I don’t understand
Are they sharing the same coroutine scheduler ? Meaning collector resume only gets scheduled after main is finished or main suspends?
a
It will work if you change
suspend fun main
to
fun main() = runBlocking { ... }
, because a suspending main function is implemented by the compiler and does not use the
kotlinx.coroutines
threads
r
Alright in that case they probably do share a thread: •
yield
isn't guaranteed to suspend, so I am guessing it isn't here •
delay(1)
does suspend and allows for the collect to run •
Thread.sleep
will actually block the thread, which would prevent the collect from running
Note:
launch
doesn't actually guarantee immediate start of the job. So that might also play a role here.
p
Yeah the thing is they don’t actually share the same thread 😂 I put some println for thread name on collector and main and confirmed it’s different threads
So something else is going on
Note:
launch
doesn't actually guarantee immediate start of the job. So that might also play a role here.
right, that’s why I also put an obnoxious Thread.sleep in main before emitting as well
r
What happens if you provide a
start
value of
UNDISPATCHED
to the
launch
call?
Or you can call
job.start()
to make sure it's actually started.
I guess the
Thread.sleep
will block the thread that is responsible for starting the job. Since it does work with
delay
.
e
launch(Dispatchers.Unconfined) might work too
haven't tested though
p
Passing start Undispatched makes the collector start immediately on
main
(obviously), but doesn’t change the outcome : collector still does not get the value (gets cancelled before it happening)
So the job is starting in time, and
collect
is called before
emit
, but for some reason the collector resumption is only scheduled too late
That, I don’t understand
e
oh right you created that in its own scope
so it's just unrelated
yield in main doesn't have any effect on the default dispatcher that your collector is running in
it'll yield to something else launched in main, but there is nothing
p
Right, OK. I can remove the yields from the example, behavior does not change. Still don’t understand though
Seems like it’s the same issue as GitHub.com/Kotlin/kotlinx.coroutines/issues/3844 but the response in there did not convince me (race conditions)
a
Your main function is being ran by the default blocking runner generated when the kotlin compiler sees a
suspend fun main
yield()
is a part of
kotlinx.coroutines
, thus not having an effect on it
r
This indeed isn't a race condition. The issue is somewhere in the behavior difference of
Thread.sleep
vs
delay
. For some reason
Thread.sleep
blocks your collection, not exactly sure why.
Without any sort of delay it will obviously become a race condition, but that isn't the case in the current code snippet.
👍 1
p
So, changing from
suspend fun main
to
fun main() = runBlocking {}
makes it behave as I expect — even removing all yields (keeping sleeps) So now it works, but it’s one more thing I don’t understand 😂
e
in general I would choose
Copy code
suspend fun main() = coroutineScope {
    ...
}
but either way
r
Alright did some testing: https://pl.kotl.in/N9bItiT2K And learned the following: • to make sure your collection actually start happening you’ll need the
start = CoroutineStart.UNDISPATCHED
,
job.start()
will force start the job, but it doesn’t wait for it to suspend (a.ka. the
collect
call) •
fun main() = runBlocking
is bound to the main thread and blocks it when any call suspends,
suspend fun main
is initially bound to the main thread, but can switch to other threads, it also doesn’t block threads • there is a race condition in the
job.cancelAndJoin()
, once
flow.emit(1)
resumes the
collect
for that value hasn’t actually run yet, so you might cancel the job before the
println
is ever called
e
suspend fun main
is implemented by the compiler, avoiding a hard dependency on
kotlinx.coroutines
if you're using
kotlinx.coroutines
, you want to get into
kotlinx.coroutines
scope, both
coroutineScope
and
runBlocking
accomplish that
p
suspend fun main() = coroutineScope {}
This has the same issue
there is a race condition in the
job.cancelAndJoin()
, once
flow.emit(1)
resumes the
collect
for that value hasn’t actually run yet, so you might cancel the job before the
println
is ever called
That's why I put a sleep of 1 sec after emitting on main thread
e
oh right, that doesn't change dispatchers by default
r
Yeah the issue is that after the call to
flow.emit
your
main
function is running on the same thread as the collect, so the
Thread.sleep
blocks the collection, which
delay
doesn’t.
p
Why does after the call to emit the main function is running on the same thread as the collector?
That sounds weird
Are you saying emit switches threads from Main to Default ?
r
That how coroutines work, they aren’t bound to a specific thread, once a call suspends and resumes (in this case
emit
) the thread might have changed.
Correct. You can try running the sample in the playground a couple times. It regularly switches to the same thread that is used by the collect.
The reason why the solution with
runBlocking
works is because it will never switch threads
p
Oh boy you are right. Just put some println and main is suddenly on Default after the emit on the suspend main version
This is totally unexpected for me, but I guess it makes sense since there's no dispatcher in the context when using suspend main
r
Yeah I think the main takeaway here is to use
delay
instead of
Thread.sleep
as the later can cause unexpected results due to it blocking a thread
p
I'd never use a sleep in real code , was just trying to figure out internals in a playground. But this realization of thread switching after emit returning from suspension makes everything make sense to me now
👍🏻 1
That's also a behavior that probably is rarely seen from real application code, especially Android , as there's almost always a dispatcher in context
Meaning emit would resume in the original thread
r
Yeah it would resume in the original dispatcher, though that could still be a different thread (depending on the dispatcher).
👌 1
e
yeah Android code using
lifecycleScope
or similar is basically on
Dispatchers.Main
I kinda forgot that
fun main() = coroutineScope {
doesn't add
Dispatchers.Main
, and it couldn't work like that anyway (there is no main dispatcher in CLI)
p
Yeah could be a different thread in same dispatcher. But certainly not the same thread as the collector thread if they are running on different dispatchers or even same dispatcher but multithreaded one. Thanks @ephemient @Rick Clephas @asdf asdf really appreciate it
f
I'd also stress out that making the assumption that after one coroutine emits a value, the other will immediately collect it, is generally wrong and code should not rely on it. In this case we could even see it as still a race conditions between the main (emitter) coroutine and the collector, which is won always by the main one because it "cheats" by blocking the thread and preventing the collector from running. If you remove the Thread.sleep you still have the same result because you have the same race condition, you simply didn't solve it (actually making it very favorable to the collector) by adding the Thread.sleep.
e
I believe you can expect
Copy code
flow {
    emit(...)
}.collect {
    it
}
to be synchronous
but otherwise, correct. it's not guaranteed
p
but for a buffer-less MutableSharedFlow that is exactly the premise: emit suspends until collectors receive the value Now what exactly means “receiving the value” is debatable in this context
The thread sleep after emit in snippet is to wait for collector to “finish”, but for it to be resumed (which may dispatch), this should have happened as soon as emit resumes from suspension I believe
e
put another way:
Copy code
flow {
    println(1)
    emit(1.5)
    println(2)
}.collect {
    println(it)
}
should print `1 1.5 2`;
Copy code
val flow = MutableSharedFlow<Any?>()
launch(start = CoroutineStart.UNDISPATCHED) {
    flow.take(1).collect { println(it) }
}
println(1)
flow.emit(1.5)
println(2)
may print
1 1.5 2
or
1 2 1.5
👍 1
p
^ depending on dispatcher, right? A direct dispatcher will always mean 1 1.5 2
e
emit
does not wait for the collecting coroutines to be dispatched
but sure, if it's always immediate then I think you can expect the first order
seems kind of fragile though
p
emit
does not wait for the collecting coroutines to be dispatched
That sounds odd. If the shared flow has no buffer, I think it must wait for the collect continuation to be resumed / dispatched
It’s Rendezvous
e
it just has to wait for them to be scheduled, right?
p
Isn’t that the same? Scheduled, dispatched
Resumed
Resuming a continuation (may?) dispatch it if there is a dispatcher and dispatch is needed, otherwise it resumes in same stack frame
e
OK I need to adjust my terminology. yes they are either run in place or dispatched (which is what I meant by scheduled). but being dispatched doesn't mean it's run immediately
p
Sure. It may get queued somewhere (which is what happens in the snipped since it’s collected in dispatchers.Default)
But continuation got resumed as soon as emit finished (Continuation.resume)
r
The difference is in a cold vs hot flow. With a cold flow there is only a single coroutine job involved. So it’s basically calling a suspend function which will wait for it to resume. Hot flows on the other hand have 2 jobs involved, one emitting values and one collecting them. The emit will suspend until the value is being collected. However collected just means that the second job has started to process this value. It doesn’t mean the processing is completed.
p
^ that’s my understanding as well
e
right, in that example there's two coroutines that can be resumed after the emit: the one that ran emit, and the flow collector. if they're run undispatched then you have an ordering, otherwise you don't
👍 1
p
(But in my original snippet, after the continuation of collector was dispatched, I slept on same thread, then cancelled the job, and by the time it was time to actually run the emit code, job was already cancelled, and then threw cancellation exceptio)
@ephemient if it’s same single thread dispatcher I think you might also have an ordering because mutable shared flow internals first resumes one then the other , so if dispatcher queue is e.g. FIFO you got your order, but that might depend on dispatcher impl details, etc. overall agreed, especially in multithreading it’s just asynchronous and unpredictable unless external synchronization is used
e
I don't think it's even guaranteed that
launch { println(1) }; delay(1); println(2)
will print
1 2
when multithreading
p
Yeah it is not guaranteed I believe
In single thread FIFO dispatcher it is guaranteed to print
2 1
, since launch must put the coroutine at the end of the queue right? Unless you
yield()
or something before
print(2)
to suspend and push the continuation to after the previously scheduled coroutine
But then again this is just a fun mental exercise since real code should never rely on those assumptions as it’s internal details
e
(did you miss the
delay(1)
?)
p
Uh yeah I missed the delay
Was reading it as
launch { println(1) }; println(2)