Patrick Steiger
09/15/2023, 5:36 PMval flow = MutableSharedFlow<Int>()
val job = CoroutineScope(Job()).launch {
flow.collect { println(it) }
}
suspend fun main() {
yield()
Thread.sleep(1000)
flow.emit(1)
yield()
Thread.sleep(1000)
job.cancelAndJoin()
}
why does collector get cancelled before receiving the emission in above example? In order to avoid race conditions, I put some yields and sleeps to make sure first collector appears before the emission, and also to make sure collector has time to collect after the emission. Still never happens
It only happens with delay(1) after emissionPatrick Steiger
09/15/2023, 5:37 PMRick Clephas
09/15/2023, 5:39 PMyield only affects the current thread or thread pool, so I am guessing your main function and coroutinescope aren't using the same dispatcher. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.htmlPatrick Steiger
09/15/2023, 5:41 PMPatrick Steiger
09/15/2023, 5:42 PMPatrick Steiger
09/15/2023, 5:43 PMasdf asdf
09/15/2023, 5:48 PMsuspend fun main to fun main() = runBlocking { ... } , because a suspending main function is implemented by the compiler and does not use the kotlinx.coroutines threadsRick Clephas
09/15/2023, 5:58 PMyield isn't guaranteed to suspend, so I am guessing it isn't here
• delay(1) does suspend and allows for the collect to run
• Thread.sleep will actually block the thread, which would prevent the collect from runningRick Clephas
09/15/2023, 5:59 PMlaunch doesn't actually guarantee immediate start of the job. So that might also play a role here.Patrick Steiger
09/15/2023, 6:00 PMPatrick Steiger
09/15/2023, 6:00 PMPatrick Steiger
09/15/2023, 6:00 PMNote:right, that’s why I also put an obnoxious Thread.sleep in main before emitting as welldoesn't actually guarantee immediate start of the job. So that might also play a role here.launch
Rick Clephas
09/15/2023, 6:02 PMstart value of UNDISPATCHED to the launch call?Rick Clephas
09/15/2023, 6:03 PMjob.start() to make sure it's actually started.Rick Clephas
09/15/2023, 6:05 PMThread.sleep will block the thread that is responsible for starting the job. Since it does work with delay.ephemient
09/15/2023, 6:07 PMephemient
09/15/2023, 6:07 PMPatrick Steiger
09/15/2023, 6:11 PMmain (obviously), but doesn’t change the outcome : collector still does not get the value (gets cancelled before it happening)Patrick Steiger
09/15/2023, 6:11 PMcollect is called before emit, but for some reason the collector resumption is only scheduled too latePatrick Steiger
09/15/2023, 6:11 PMephemient
09/15/2023, 6:13 PMephemient
09/15/2023, 6:13 PMephemient
09/15/2023, 6:14 PMephemient
09/15/2023, 6:15 PMPatrick Steiger
09/15/2023, 6:15 PMPatrick Steiger
09/15/2023, 6:16 PMasdf asdf
09/15/2023, 6:17 PMsuspend fun main
yield() is a part of kotlinx.coroutines , thus not having an effect on itRick Clephas
09/15/2023, 6:19 PMThread.sleep vs delay. For some reason Thread.sleep blocks your collection, not exactly sure why.Rick Clephas
09/15/2023, 6:21 PMPatrick Steiger
09/15/2023, 6:22 PMsuspend fun main to fun main() = runBlocking {} makes it behave as I expect — even removing all yields (keeping sleeps)
So now it works, but it’s one more thing I don’t understand 😂ephemient
09/15/2023, 6:59 PMsuspend fun main() = coroutineScope {
...
}
but either wayRick Clephas
09/15/2023, 7:00 PMstart = CoroutineStart.UNDISPATCHED, job.start() will force start the job, but it doesn’t wait for it to suspend (a.ka. the collect call)
• fun main() = runBlocking is bound to the main thread and blocks it when any call suspends, suspend fun main is initially bound to the main thread, but can switch to other threads, it also doesn’t block threads
• there is a race condition in the job.cancelAndJoin(), once flow.emit(1) resumes the collect for that value hasn’t actually run yet, so you might cancel the job before the println is ever calledephemient
09/15/2023, 7:00 PMsuspend fun main is implemented by the compiler, avoiding a hard dependency on kotlinx.coroutinesephemient
09/15/2023, 7:01 PMkotlinx.coroutines, you want to get into kotlinx.coroutines scope, both coroutineScope and runBlocking accomplish thatPatrick Steiger
09/15/2023, 7:59 PMsuspend fun main() = coroutineScope {}
This has the same issuePatrick Steiger
09/15/2023, 8:00 PMthere is a race condition in the, oncejob.cancelAndJoin()resumes theflow.emit(1)for that value hasn’t actually run yet, so you might cancel the job before thecollectis ever calledprintln
That's why I put a sleep of 1 sec after emitting on main thread
ephemient
09/15/2023, 8:01 PMRick Clephas
09/15/2023, 8:02 PMflow.emit your main function is running on the same thread as the collect, so the Thread.sleep blocks the collection, which delay doesn’t.Patrick Steiger
09/15/2023, 8:02 PMPatrick Steiger
09/15/2023, 8:03 PMPatrick Steiger
09/15/2023, 8:03 PMRick Clephas
09/15/2023, 8:03 PMemit) the thread might have changed.Rick Clephas
09/15/2023, 8:04 PMRick Clephas
09/15/2023, 8:05 PMrunBlocking works is because it will never switch threadsPatrick Steiger
09/15/2023, 8:05 PMPatrick Steiger
09/15/2023, 8:06 PMRick Clephas
09/15/2023, 8:07 PMdelay instead of Thread.sleep as the later can cause unexpected results due to it blocking a threadPatrick Steiger
09/15/2023, 8:08 PMPatrick Steiger
09/15/2023, 8:09 PMPatrick Steiger
09/15/2023, 8:09 PMRick Clephas
09/15/2023, 8:10 PMephemient
09/15/2023, 8:10 PMlifecycleScope or similar is basically on Dispatchers.Mainephemient
09/15/2023, 8:11 PMfun main() = coroutineScope { doesn't add Dispatchers.Main, and it couldn't work like that anyway (there is no main dispatcher in CLI)Patrick Steiger
09/15/2023, 8:13 PMfranztesca
09/15/2023, 8:29 PMephemient
09/15/2023, 8:30 PMflow {
emit(...)
}.collect {
it
}
to be synchronousephemient
09/15/2023, 8:31 PMPatrick Steiger
09/15/2023, 8:32 PMPatrick Steiger
09/15/2023, 8:38 PMephemient
09/15/2023, 8:38 PMflow {
println(1)
emit(1.5)
println(2)
}.collect {
println(it)
}
should print `1 1.5 2`;
val flow = MutableSharedFlow<Any?>()
launch(start = CoroutineStart.UNDISPATCHED) {
flow.take(1).collect { println(it) }
}
println(1)
flow.emit(1.5)
println(2)
may print 1 1.5 2 or 1 2 1.5Patrick Steiger
09/15/2023, 8:39 PMephemient
09/15/2023, 8:40 PMemit does not wait for the collecting coroutines to be dispatchedephemient
09/15/2023, 8:40 PMephemient
09/15/2023, 8:40 PMPatrick Steiger
09/15/2023, 8:41 PMdoes not wait for the collecting coroutines to be dispatchedemit
That sounds odd. If the shared flow has no buffer, I think it must wait for the collect continuation to be resumed / dispatched
Patrick Steiger
09/15/2023, 8:41 PMephemient
09/15/2023, 8:41 PMPatrick Steiger
09/15/2023, 8:42 PMPatrick Steiger
09/15/2023, 8:42 PMPatrick Steiger
09/15/2023, 8:42 PMephemient
09/15/2023, 8:43 PMPatrick Steiger
09/15/2023, 8:43 PMPatrick Steiger
09/15/2023, 8:44 PMRick Clephas
09/15/2023, 8:44 PMPatrick Steiger
09/15/2023, 8:45 PMephemient
09/15/2023, 8:47 PMPatrick Steiger
09/15/2023, 8:47 PMPatrick Steiger
09/15/2023, 8:49 PMephemient
09/15/2023, 8:51 PMlaunch { println(1) }; delay(1); println(2) will print 1 2 when multithreadingPatrick Steiger
09/15/2023, 8:52 PMPatrick Steiger
09/15/2023, 8:54 PM2 1, since launch must put the coroutine at the end of the queue right? Unless you yield() or something before print(2) to suspend and push the continuation to after the previously scheduled coroutinePatrick Steiger
09/15/2023, 8:55 PMephemient
09/15/2023, 8:55 PMdelay(1)?)Patrick Steiger
09/15/2023, 8:55 PMPatrick Steiger
09/15/2023, 8:56 PMlaunch { println(1) }; println(2)