I’m using `StateFlow` to sort of cache and share t...
# coroutines
t
I’m using
StateFlow
to sort of cache and share the result of a coroutine. The
StateFlow
is held in a singleton, and accessed (collected) from multiple different places. When the scope that launches one of these collectors is cancelled (via
scope.cancel()
, it seems that the
StateFlow
is cancelled and no longer emits to any collectors. Can I cancel the collectors launched in a scope, without cancelling the parent producer?
d
`StateFlow`s can't be cancelled (yet?). It's possible you are accidentally cancelling the producing coroutine.
t
I’m calling cancel on the scope which launched the StateFlow. I was thinking that this was the equivalent of cancelling a `CompositeDisposableJ`` in RxJava. But after some reading, I’m thinking perhaps I need to pass a parent
Job
to the
launch
call, and cancel that job rather than calling
scope.cancel
Basically, I want to cancel collection of a coroutine, for a particular job, without causing a CancellationException and rendering all instances of that coroutine useless
d
Then you can
cancel
the returned
Job
.
l
You want to cancel the coroutine that calls
collect
, not the coroutine that emits to the
MutableStateFlow
.
t
Yes. So, currently I’m calling
coroutineScope.cancel()
, thinking it would do just that
So, I use a base
Presenter
class:
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val coroutineScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
    
    private val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        Timber.e(exception)
    }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        view = null

        coroutineScope.cancel()
    }

    fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(context + exceptionHandler, start, block)
    }
}
Whenever I want to launch a coroutine, I use
coroutineScope.launch()
. and when I’m done with this presenter, I call
unbind()
, which calls
coroutineScope.cancel()
But this seems to be having the undesired effect of triggering a CancellationException, and preventing any future collectors from receiving emissions from the parent/shared StateFlow coroutine
Maybe cancelling the ‘scope’ is too broad, and I need to keep track of the individual jobs that call collect, and explicitly cancel those?
l
Yes, it's too broad
You can launch coroutines in a coroutine, and cancel the coroutine that launches the other ones to cancel all the ones you want to cancel at once.
t
So, I saw an example of this, that looked something like:
Copy code
val parentJob = Job()

scope.launch(parentJob) { coroutine.collect... }

...
parentJob.cancel()
This didn’t seem to work (I still get the sense that the cancellation exception is propagating upstream). Is this different to what you’re suggesting?
l
Yes it's different. I'm suggesting something like that:
Copy code
val job = launch {
    launch {
        someFlow.collect { element -> ... }
    }
    launch { anotherFlow.collect { -> ... } }
}

// later on
job.cancel()
t
It feels a little… messy. I don’t quite understand - if you cancel the parent job, do the children also get cancelled? If they do, then why wouldn’t they throw a cancellation exception - which is what I’m looking to avoid? And if they don’t then, they’re going to leak.
Forgive my ignorance on the subject, I’m really only just getting started with coroutines
Oh, it looks like the
prentJob
thing does work,. You just need to call
cancelChildren()
instead of
cancel
👍 1
So, this works:
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val coroutineScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)

    private val parent = Job()

    private val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        Timber.e(exception)
    }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        view = null

        parent.cancelChildren()
    }

    fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(parent + context + exceptionHandler, start, block)
    }
}
l
The snippet I showed you does not leak so long the flows are cancellable (for example if they are suspended on a cancellable function). I think you'd greatly benefit from learning about structured concurrency. There's some good articles on Medium by Roman Elizarov, or you can watch this great talk:

https://www.youtube.com/watch?v=Mj5P47F6nJg

t
Thanks, I’ll have a read. The parent job /
cancelChildren()
does seem to solve my immediate problem, and feels like a neater approach - but I absolutely need to learn more about this - thanks
I wonder if I’m ‘leaking’ anything now though - I never call
cancel()
on the
CoroutineScope
now
d
Might as well use
GlobalScope
in that case.
t
Yeah, that’s what I was thinking. But I am cancelling all children of the parent job, so I don’t think I’m leaking. Still, this doesn’t feel quite right
l
If you have coroutines that keep running forever, it might be a leak, or lead to leaks, unless it's by design, and coroutines that keep references to only what's needed are left running, never cancelled.
t
I think the only thing ‘left running’ is that
StateFlow
coroutine, and that is by design. There’s no child coroutine that keeps running that might emit and try to manipulate a view after unbind is called (I think)
So.. Should I just move this to GlobalScope and remove the CoroutineScope from the presenter?
l
I don't think so
I think your presenter should use the coroutineScope of its host
LifecycleOwner
t
Let’s just say, for arguments sake, I’m manually managing the lifecycle
Maybe I should call
parentJob.cancelChildren()
and then call
coroutineScope.cancel()
..?
.. seems to work. I’m not seeing that unwanted CancellationException propagating upstream
Just for completeness,
BasePresenter
now looks like this, and seems to behave as expected:
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val coroutineScope by lazy { CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) }

    private val parent by lazy { Job() }

    private val exceptionHandler by lazy {
        CoroutineExceptionHandler { _, exception ->
            Timber.e(exception)
        }
    }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        view = null

        parent.cancelChildren()
        coroutineScope.cancel()
    }

    fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(parent + context + exceptionHandler, start, block)
    }
}
So now, any coroutine that is launched via
launch()
is launched with
parent
as its parent
Job
, so the coroutine is cancelled in
unbindView()
, without propagating the cancellation exception to related/parent coroutines. And then the entire scope is cancelled ,just to be sure.
Hey from /r/androiddev btw @louiscad, you might recognise me as /u/timusus
l
(I'm almost never on reddit)
t
Oh, I could’ve sworn I recognise your name from there. Maybe I’m just getting confused with GitHub issues
l
More likely
i
You better define your scope as
CoroutineScope(parent + <http://Dispatchers.IO|Dispatchers.IO>)
that way you bind them and don't need to manually cancel your scope
1
t
@ildar.i [Android] could you explain that a bit further? Are you saying, if the scope is created with
parent + <http://Dispatchers.IO|Dispatchers.IO>
, then when the
parent.cancelChildren()
is called, the
CoroutineScope
will be automatically cancelled?
i
No, it wraps parent (read into kdoc). Calling
parent.cancelChildren()
won't cancel its
CoroutineScope
, it will cancel all child scopes. To cancel parent you should call
parent.cancel()
, but you can do it only once - if you return to presenter with cancelled scope, it won't work anymore. You can read these articles: https://medium.com/@elizarov/coroutine-context-and-scope-c8b255d59055 and https://proandroiddev.com/why-your-class-probably-shouldnt-implement-coroutinescope-eb34f722e510 For Android ViewModel there is a
viewModelScope
, but since you have a presenter you should stick to manually creating your scope
t
I’m a little caught between trying to learn many new concepts, and working with the implementation I have. I don’t understand why I should do
CoroutineScope(parent + <http://Dispatchers.IO|Dispatchers.IO>)
. For my use case, calling
parent.cancelChildren()
, followed by
coroutineScope.cancel()
, seems to do the trick. I’m not sure what the benefit or difference is of the approach you’re suggesting
l
@Tim Malseed For your case, try just calling
parent.cancel()
, in place of
parent.cancelChildren()
, and
coroutineScope.cancel()
. It should work the same.
i
that way you don't need to create context every time you make a network call - presenter already has its own. check out our prod implementation - it abstracts coroutine logic into a separate object (UiCaller), so we can use composition for complex logic: https://github.com/ildar2/MySandbox/blob/master/app/src/main/java/kz/ildar/sandbox/ui/UiCaller.kt
t
@louiscad no, it doesn’t. That causes a
CancellationException
to propagate up the Coroutine hierarchy, which is what I’m trying to avoid
Basically, I’m just trying to cancel child coroutines without propagating a CancellationException up the coroutine hierarchy, so the top level StateFlow coroutine can be reused without being cancelled. I’m doing this in
presenter.unbind()
, when the presenter will never be used again - so the fact that the scope is cancelled and unusable isn’t a problem.
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    private val parentJob by lazy { Job() }

    private val coroutineScope by lazy { CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        parentJob.cancelChildren()
        coroutineScope.cancel()
    }

    fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(parentJob + context, start, block)
    }
}
This seems to work. But I’m not really sure if there’s any real point in calling
coroutineScope.cancel()
, since there are no child jobs at this point anyway.
This code seems to work, but I don’t fully understand it. It feels weird to me that if I call
parentJob.cancelChildren()
, then I avoid a
CancellationException
. Then I can call
coroutineScope.cancel()
, and still avoid the
CancellationException
, and everything seems fine. But, It feels a little messy.
@ildar.i [Android] thanks, but I’m finding it very hard to follow these other examples. I don’t mean to say ‘I can only understand my own code’, but there are a few too many concepts here. I’m trying to keep it simple - do you think there’s something wrong with my approach?
i
First off - your launch() gives away implementation details - caller should create its own context
second - your Job and context are kept separate - it defeats their purpose
t
For the first point - the caller can create and pass their own context. This is just a clone of `Builders.common.kt`’s
launch()
- so from within a Presenter you can call launch and you get an exception handler and a parent job that can cancel its children, for free.
i
third - your code is untestable - one cannot set test dispatcher for tests
t
I don’t understand your second point. The ‘job’ and ‘context’ are both passed to
coroutineScope.launch()
, so, are they ‘separate’? Can you elaborate on this?
Third, I don’t think it’s particularly relevant whether this is testable. I’m more concerned about the correct use and cancellation of coroutines/scopes/jobs
i
you have to pass parentJob in every call, while you can just inject it in coroutineScope on creation
t
Why do you think it needs to be passed in every call?
Usage looks like this:
Copy code
launch {
        coroutine.collect { ... }
    }
i
You do that in your launch() method, don't you?
t
Copy code
fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(parentJob + context + exceptionHandler, start, block)
    }
i
you pass parentJob every time you make a request, but they do not make a structure - they are two separate things, which occasionally interact
t
What do you mean by ‘they’? The
parentJob
and ?
i
and coroutineScope
t
I’m trying to understand what you’re saying..
Copy code
private val coroutineScope by lazy { CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) }

    fun launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        return coroutineScope.launch(parentJob + context + exceptionHandler, start, block)
    }
I can see that perhaps one of the problems here, is that
coroutineScope
is initialised with a context of
<http://Dispatchers.IO|Dispatchers.IO>
: But then
coroutineScope.launch
is called, and a new context is passed, comprised of
parentJob + context + exceptionHandler
- so the `Dispatchers.IO`` context is potentially discarded / redundant?
@ildar.i [Android] does this look better to you?
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val parentJob by lazy { Job() }

    private val exceptionHandler by lazy {
        CoroutineExceptionHandler { _, exception -> Timber.e(exception) }
    }

    private val coroutineScope by lazy { CoroutineScope(parentJob + exceptionHandler + <http://Dispatchers.IO|Dispatchers.IO>) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        parentJob.cancelChildren()
        view = null
    }

    fun launch(block: suspend CoroutineScope.() -> Unit): Job {
        return coroutineScope.launch(block = block)
    }
}
i
Yeah, much better, now you gotta handle progress while loading and to handle errors gracefully to the user
And try to write a test
t
Oh, no this is just me migrating from RxJava to Coroutines in a large, existing codebase
It’s not like.. me trying to write a Presenter
i
So you don't have progress handling in your large existing codebase?
t
I feel like you’re talking about a specific use case or something? I don’t have progress handling as a generic need for my base presenter class
i
Whenever app is making a request, user should see a progress bar. If an error occurs, he should see that. If don't do that, that's bad
t
For long running tasks, I agree. Most of the tasks in this app are not HTTP requests, and they are not long running
They’re things like fetching data from a local database. Progress/loading states are handled in various ways - sometimes a progress bar is shown when the coroutine is launched, and hidden once it returns.
I don’t quite understand how this relates to the small example I’ve got here though - loading and error states are handled in the actual presenter implementation, rather than the base class (for me)
So, the thing that irks me about this solution, is that
coroutineScope
itself is never cancelled. Only the children of
parentJob
. I mean, I think that’s fine, and it means there won’t be any coroutines running inside of the presenter after unbind is called, but something about not calling
cancel
on the scope that makes me wonder if this is.. complete
i
That first article i sent earlier explains it
t
OK, I’ll have a read of that now, thanks
So it looks like I can eliminate
parentJob
altogether, and just call `coroutineScope.coroutineContext.cancelChildren()`:
Copy code
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val exceptionHandler by lazy {
        CoroutineExceptionHandler { _, exception -> Timber.e(exception) }
    }

    private val coroutineScope by lazy { CoroutineScope(exceptionHandler + <http://Dispatchers.IO|Dispatchers.IO>) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        coroutineScope.coroutineContext.cancelChildren()
        view = null
    }

    fun launch(block: suspend CoroutineScope.() -> Unit): Job {
        return coroutineScope.launch(block = block)
    }
}
@ildar.i [Android] that was an interesting read, thanks. I think I'll probably have to read it a few more times, and work with Coroutines some more before I fully understand it. So, the scope I use to launch a coroutine (child job) becomes the parent context of that job. So calling cancel children on that parent context cancels the child job. This makes sense. It's still not clear to me whether I should also cancel the parent context.. It has no active child jobs. But what about the parent job itself? Just leave it and let it get garbage collected I suppose..?
i
I don't really know the details of the implementation, so i keep cancel() in onCleared method just in case
t
Unfortunately in my case, calling cancel on the scope causes a CancellationException, causing the shared StateFlow to stop emitting. But it's OK, I don't think cancel is needed, cancelChildren seems sufficient
I’ve asked this question on StackOverflow, with a bit more context, and refined with my new understanding of this stuff: https://stackoverflow.com/questions/62747825/coroutine-stateflow-stops-emitting-when-coroutinescope-is-cancelled