Patrick Steiger
09/15/2023, 5:36 PMval flow = MutableSharedFlow<Int>()
val job = CoroutineScope(Job()).launch {
flow.collect { println(it) }
}
suspend fun main() {
yield()
Thread.sleep(1000)
flow.emit(1)
yield()
Thread.sleep(1000)
job.cancelAndJoin()
}
why does collector get cancelled before receiving the emission in above example? In order to avoid race conditions, I put some yields and sleeps to make sure first collector appears before the emission, and also to make sure collector has time to collect after the emission. Still never happens
It only happens with delay(1)
after emissionPatrick Steiger
09/15/2023, 5:37 PMRick Clephas
09/15/2023, 5:39 PMyield
only affects the current thread or thread pool, so I am guessing your main function and coroutinescope aren't using the same dispatcher. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.htmlPatrick Steiger
09/15/2023, 5:41 PMPatrick Steiger
09/15/2023, 5:42 PMPatrick Steiger
09/15/2023, 5:43 PMasdf asdf
09/15/2023, 5:48 PMsuspend fun main
to fun main() = runBlocking { ... }
, because a suspending main function is implemented by the compiler and does not use the kotlinx.coroutines
threadsRick Clephas
09/15/2023, 5:58 PMyield
isn't guaranteed to suspend, so I am guessing it isn't here
• delay(1)
does suspend and allows for the collect to run
• Thread.sleep
will actually block the thread, which would prevent the collect from runningRick Clephas
09/15/2023, 5:59 PMlaunch
doesn't actually guarantee immediate start of the job. So that might also play a role here.Patrick Steiger
09/15/2023, 6:00 PMPatrick Steiger
09/15/2023, 6:00 PMPatrick Steiger
09/15/2023, 6:00 PMNote:right, that’s why I also put an obnoxious Thread.sleep in main before emitting as welldoesn't actually guarantee immediate start of the job. So that might also play a role here.launch
Rick Clephas
09/15/2023, 6:02 PMstart
value of UNDISPATCHED
to the launch
call?Rick Clephas
09/15/2023, 6:03 PMjob.start()
to make sure it's actually started.Rick Clephas
09/15/2023, 6:05 PMThread.sleep
will block the thread that is responsible for starting the job. Since it does work with delay
.ephemient
09/15/2023, 6:07 PMephemient
09/15/2023, 6:07 PMPatrick Steiger
09/15/2023, 6:11 PMmain
(obviously), but doesn’t change the outcome : collector still does not get the value (gets cancelled before it happening)Patrick Steiger
09/15/2023, 6:11 PMcollect
is called before emit
, but for some reason the collector resumption is only scheduled too latePatrick Steiger
09/15/2023, 6:11 PMephemient
09/15/2023, 6:13 PMephemient
09/15/2023, 6:13 PMephemient
09/15/2023, 6:14 PMephemient
09/15/2023, 6:15 PMPatrick Steiger
09/15/2023, 6:15 PMPatrick Steiger
09/15/2023, 6:16 PMasdf asdf
09/15/2023, 6:17 PMsuspend fun main
yield()
is a part of kotlinx.coroutines
, thus not having an effect on itRick Clephas
09/15/2023, 6:19 PMThread.sleep
vs delay
. For some reason Thread.sleep
blocks your collection, not exactly sure why.Rick Clephas
09/15/2023, 6:21 PMPatrick Steiger
09/15/2023, 6:22 PMsuspend fun main
to fun main() = runBlocking {}
makes it behave as I expect — even removing all yields (keeping sleeps)
So now it works, but it’s one more thing I don’t understand 😂ephemient
09/15/2023, 6:59 PMsuspend fun main() = coroutineScope {
...
}
but either wayRick Clephas
09/15/2023, 7:00 PMstart = CoroutineStart.UNDISPATCHED
, job.start()
will force start the job, but it doesn’t wait for it to suspend (a.ka. the collect
call)
• fun main() = runBlocking
is bound to the main thread and blocks it when any call suspends, suspend fun main
is initially bound to the main thread, but can switch to other threads, it also doesn’t block threads
• there is a race condition in the job.cancelAndJoin()
, once flow.emit(1)
resumes the collect
for that value hasn’t actually run yet, so you might cancel the job before the println
is ever calledephemient
09/15/2023, 7:00 PMsuspend fun main
is implemented by the compiler, avoiding a hard dependency on kotlinx.coroutines
ephemient
09/15/2023, 7:01 PMkotlinx.coroutines
, you want to get into kotlinx.coroutines
scope, both coroutineScope
and runBlocking
accomplish thatPatrick Steiger
09/15/2023, 7:59 PMsuspend fun main() = coroutineScope {}
This has the same issuePatrick Steiger
09/15/2023, 8:00 PMthere is a race condition in the, oncejob.cancelAndJoin()
resumes theflow.emit(1)
for that value hasn’t actually run yet, so you might cancel the job before thecollect
is ever calledprintln
That's why I put a sleep of 1 sec after emitting on main thread
ephemient
09/15/2023, 8:01 PMRick Clephas
09/15/2023, 8:02 PMflow.emit
your main
function is running on the same thread as the collect, so the Thread.sleep
blocks the collection, which delay
doesn’t.Patrick Steiger
09/15/2023, 8:02 PMPatrick Steiger
09/15/2023, 8:03 PMPatrick Steiger
09/15/2023, 8:03 PMRick Clephas
09/15/2023, 8:03 PMemit
) the thread might have changed.Rick Clephas
09/15/2023, 8:04 PMRick Clephas
09/15/2023, 8:05 PMrunBlocking
works is because it will never switch threadsPatrick Steiger
09/15/2023, 8:05 PMPatrick Steiger
09/15/2023, 8:06 PMRick Clephas
09/15/2023, 8:07 PMdelay
instead of Thread.sleep
as the later can cause unexpected results due to it blocking a threadPatrick Steiger
09/15/2023, 8:08 PMPatrick Steiger
09/15/2023, 8:09 PMPatrick Steiger
09/15/2023, 8:09 PMRick Clephas
09/15/2023, 8:10 PMephemient
09/15/2023, 8:10 PMlifecycleScope
or similar is basically on Dispatchers.Main
ephemient
09/15/2023, 8:11 PMfun main() = coroutineScope {
doesn't add Dispatchers.Main
, and it couldn't work like that anyway (there is no main dispatcher in CLI)Patrick Steiger
09/15/2023, 8:13 PMfranztesca
09/15/2023, 8:29 PMephemient
09/15/2023, 8:30 PMflow {
emit(...)
}.collect {
it
}
to be synchronousephemient
09/15/2023, 8:31 PMPatrick Steiger
09/15/2023, 8:32 PMPatrick Steiger
09/15/2023, 8:38 PMephemient
09/15/2023, 8:38 PMflow {
println(1)
emit(1.5)
println(2)
}.collect {
println(it)
}
should print `1 1.5 2`;
val flow = MutableSharedFlow<Any?>()
launch(start = CoroutineStart.UNDISPATCHED) {
flow.take(1).collect { println(it) }
}
println(1)
flow.emit(1.5)
println(2)
may print 1 1.5 2
or 1 2 1.5
Patrick Steiger
09/15/2023, 8:39 PMephemient
09/15/2023, 8:40 PMemit
does not wait for the collecting coroutines to be dispatchedephemient
09/15/2023, 8:40 PMephemient
09/15/2023, 8:40 PMPatrick Steiger
09/15/2023, 8:41 PMdoes not wait for the collecting coroutines to be dispatchedemit
That sounds odd. If the shared flow has no buffer, I think it must wait for the collect continuation to be resumed / dispatched
Patrick Steiger
09/15/2023, 8:41 PMephemient
09/15/2023, 8:41 PMPatrick Steiger
09/15/2023, 8:42 PMPatrick Steiger
09/15/2023, 8:42 PMPatrick Steiger
09/15/2023, 8:42 PMephemient
09/15/2023, 8:43 PMPatrick Steiger
09/15/2023, 8:43 PMPatrick Steiger
09/15/2023, 8:44 PMRick Clephas
09/15/2023, 8:44 PMPatrick Steiger
09/15/2023, 8:45 PMephemient
09/15/2023, 8:47 PMPatrick Steiger
09/15/2023, 8:47 PMPatrick Steiger
09/15/2023, 8:49 PMephemient
09/15/2023, 8:51 PMlaunch { println(1) }; delay(1); println(2)
will print 1 2
when multithreadingPatrick Steiger
09/15/2023, 8:52 PMPatrick Steiger
09/15/2023, 8:54 PM2 1
, since launch must put the coroutine at the end of the queue right? Unless you yield()
or something before print(2)
to suspend and push the continuation to after the previously scheduled coroutinePatrick Steiger
09/15/2023, 8:55 PMephemient
09/15/2023, 8:55 PMdelay(1)
?)Patrick Steiger
09/15/2023, 8:55 PMPatrick Steiger
09/15/2023, 8:56 PMlaunch { println(1) }; println(2)