https://kotlinlang.org logo
Title
u

ursus

11/17/2021, 2:07 AM
I'd like to ask about scope cancelation. Is it 100% synchronous?
someOtherPlace {
   someUserScope.launch {
       val userFlow: Flow<User?> = userDao.user()
       userFlow
        .collect {
             // can 'it' be null here?
         }
   }
}

fun logout() {
	allUserCoroutineScopes.forEach { it.cancel() }
	userDao.deleteUser()
}
lets assume
someUserScope
is in the
allUserCoroutineScopes
list, and I'll only delete user after running the cancelation forEach Is there a way the flow at the top will emit null? Should never happen, right? Is it safe to
!!
there? (I'm looking at sources but its non obvious to me)
a

Adam Powell

11/17/2021, 2:24 AM
no, it's not synchronous. Each cancelled coroutine that was suspended at time of cancellation will resume with a CancellationException on the coroutine's dispatcher. If you need to wait until all of those things are complete, you can do something like
allUserCoroutineScopes.forEach { it.coroutineContext.job.join() }
after the foreach/cancel line in your example above.
u

ursus

11/17/2021, 2:35 AM
I'm a bit confused, do I care about the CancelationException? Isn't cancelation comunicated via the
awaitClose
and friends? Anyways, you're saying the the open flows could get canceled after
userDao.deleteUser()
, therefore they could emit null, right?
a

Adam Powell

11/17/2021, 2:55 AM
for a whole host of reasons they could still be running. You could probably align the stars such that you wouldn't encounter problems, but the easiest and most sure way to make sure you don't delete the user until you've stopped observing your user flow is to wait for all observers of the user flow to join before deleting it.
u

ursus

11/17/2021, 2:58 AM
will the joining work if its a hot stream? Or do you mean something like
allUserCoroutineScopes.forEach { it.cancel() }
allUserCoroutineScopes.forEach { it.coroutineContext.job.join() }
I'd maybe expect for cancel to be suspending then, as api design Rx had isDisposed checks before emitting so no emits after cancel would happen
Btw I found this https://github.com/Kotlin/kotlinx.coroutines/issues/1265 people complaining about emissions after onDestroy, which is essentially my case, and the issue was closed successfully, does it not relate?
n

Nick Allen

11/17/2021, 6:52 AM
Implementations of
collect
check if the coroutine is active before emitting(here). So if
someUserScope
uses a single threaded dispatcher and
logout
is only called from that thread, then you don't have to worry about the
collect
lambda being invoked after
logout
is called. However, if there's any possibility of multithreading, then you've lost your guarantee entirely. Also,
cancel
is 100% not synchronous at all not even a little. Even if one thread is involved, a previously invoked
collect
lambda could be suspended when you call
logout
, so the user could be deleted when the that other code resumes (not that it'll affect your locally captured
it
variable but it could cause other problems depending on your code.
Honestly, I'd go with
it ?: return@collect
to start the lambda just to make it more robust. It's easy to forget relationships between code like that and either start collecting before you've setup your user or delete the user in some other scenario or mix the order of operations up. Code that can be described as "If I do this exact thing here, that code over there will work correctly" is usually more difficult to maintain in my experience.
u

ursus

11/17/2021, 12:47 PM
Its bit more involved I just tried to keep it minimal. What Im trying to do is have
LoggedInUserProvider: Flow<User>
within loggedIn semantic DI scope, composed from
GlobalUserProvider: Flow<User?>
which is appscoped so it can be used outside the loggedin scope (and trying to discern how to create one, whether by asserting not null, or, just filtering) anyways…im still baffled by the stream emitting after scope cancelation Im sort of okay with already "in flight" emits finishing but source stream (
userDao.user()
) producing new emits after cancel?
why? how is this not a problem? Is it just a transient issue due to multithreading or by design? I believe rx has isDisposed checks before emitting to prevent this very thing, or doesnt it?
a

Adam Powell

11/17/2021, 2:31 PM
there are a few questions all being asked at once here so let's separate them. Regarding flows in particular, see https://pl.kotl.in/yx_pPNFJB
the standard flow builders throw a new
CancellationException
if you try to emit while the current job is cancelled.
So you won't get new emits to a cancelled collector.
Job cancellation is atomic, but cooperative. suspend functions expect to be able to run their own
finally
blocks with their assumptions intact, i.e. they're running on their expected dispatcher. That means cancelled jobs resume on that dispatcher asynchronously and you can't make general assumptions like, "code can't execute in a cancelled job." You can't have
finally
work if that were the case.
It looks like you're seeking to make assumptions about some flow collector code that has been simplified for the sake of the example/question, and whatever simplifications the example above has been through might affect the correctness of a surface-level answer.
If you want to fully guarantee that a coroutine isn't still running in some capacity before taking some action, you need to use
join()
to await its full completion and cleanup.
There is a suspending
cancelAndJoin
function available: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-and-join.html but I didn't use it in my suggestion above since the looping over a collection of scopes would mean waiting for one job to join before cancelling the next; might as well cancel them all first and then join them all so that stuff can all happen concurrently.