https://kotlinlang.org logo
#coroutines
Title
# coroutines
p

pedro

07/23/2020, 12:20 PM
I have a view emitting a flow of Actions which is collected by a handler which emits a flow of Mutations (that will update the state) and they have different lifecycle scopes (the handler can outlive the views). How can I do it so that I can cancel the view’s scope (no more Actions come through) yet the handling of the actions still being processed is not cancelled? Say the handler takes 5 seconds processing each action. When I cancel the view scope, I know that I should expect more mutations to come through but they get cancelled too. I’ve gone through the docs and tried many different alternatives but none worked.
This is some of my failed attempts:
Copy code
viewScope.launch {
                viewActions
                    .buffer()
//                    .flowOn(viewScope) // tried this while replacing the first viewScope.launch with handleScope.launch - did not work
                    .collect { action ->
                        log("Handling action $action")
                        handlerScope.launch { // dit not work
//                        launch(handlerScope.coroutineContext) { // did not work
//                            withContext(handlerScope.coroutineContext) { // did not work (together with the next launch)
//                                launch {
                            handler.handle(action)
                                .collect {
                                    // do something with mutation
                                    log("Mutation $it")
                                    counter++
                                }
//                                }
//                            }
                        }
//                        }
                    }
            }
s

streetsofboston

07/23/2020, 12:28 PM
We use a
StateFlow
or an
EventFlow
for that. We rolled our own
StateFlow
and
EventFlow
, and they are `Flow`s that are backed by a
ConflatedBroadcastChannel
(for StateFlow) or a plain rendezvous
Channel
(for EventFlow).
The producer of the values has a different lifecycle (coroutine-scope) than the consumers (collectors of the flow). The producer-scope outlives the consumer-scope.
p

pedro

07/23/2020, 12:41 PM
I use a state flow in other parts of the system, but I don’t think that would do it in this case. I can’t use conflated flows here. Each value is important (each button click; each update (mutation) on the data). I don’t want just the latest one. The handler itself knows how to react to them but is stateless. It emits mutations which are fed to a reducer and then it updates the state flow… But the handler doesn’t do that directly. The handler only receives actions, does what it needs to do, and emits mutations.
s

streetsofboston

07/23/2020, 12:47 PM
Then an EventFlow would work, correct? It would not cache values. E.g:
Copy code
class EventFlow<T> private constructor(
    private val channel: Channel<T>
): Flow<T> by channel.receiveAsFlow() {

    suspend fun sendValue(value: T) {
        channel.send(value)
    }

    companion object {
        operator fun <T> invoke(): EventFlow<T> = EventFlow(Channel())
    }
}
(note;
receiveAsFlow
produces a Flow that can only have one collector/observer. you’d need to to a bit more code to allow for more than one collector)
p

pedro

07/23/2020, 12:51 PM
one collector is fine 👍 is there some article explaining that / how it works? I googled EventFlow and it took me to posts about shared flow which hasn’t been implemented (officially at least) AFAIK.
s

streetsofboston

07/23/2020, 12:53 PM
Yup, StateFlow is in kotlin 1.3.7, but no EventFlow yet. We rolled our own on both of them (for now; since they need to work with Kotlin Multi Platform Multi Threading).
In the above code I posted, calling
send
will suspend until a collector collects the sent value. This is done through a rendezvous
Channel
inside the
EventFlow
.
p

pedro

07/23/2020, 1:01 PM
hmmm the rendezvous could become an issue but this is a step forward… but it still doesn’t work for me. Is this code right?
Copy code
val eventFlow = EventFlow<Action>()
            viewScope.launch {
                viewActions
                    .buffer()
                    .onEach {
//                        log("sending action $it")
                        eventFlow.sendValue(it)
                    }
                    .launchIn(viewScope)
            }
            handlerScope.launch {
                eventFlow
                    .collect { action ->
                        log("Handling action $action")
                        handlerScope.launch { 
                            handler.handle(action)
                                .collect {
                                    // do something with mutation
                                    log("Mutation $it")
                                    counter++
                                }
                        }
                    }
            }
finishing the view scope still cancels on going work by the handler (sorry, the code isn’t completely clean but I don’t think that is the cause of the problem)
even making the channel buffered did not help. maybe I’m using the EventFlow wrong? but the idea of having an intermediary channel probably works (although I hoped to do this just with the right coroutines scopes)
s

streetsofboston

07/23/2020, 1:06 PM
The
viewScope
is the scope of the producer. The
handleScope
is the scope of the consumer. Cancelling the viewScope will just stop sending values. It won’t cancel the handleScope, but the call to
eventFlow.collect
will not do anything, since there is no-one sending any values….
And i’m not sure what
handler.handle(action)
returns…. could that be a Flow that gets cancelled too soon…?
p

pedro

07/23/2020, 1:08 PM
the handler is both a consumer of actions and a producer of mutations
yes, handle returns a
Flow<Mutation>
but the call to 
eventFlow.collect
 will not do anything, since there is no-one sending any values….
I was (at least trying) to send the events from the original flow into this event flow. and the handler scope is collecting this
s

streetsofboston

07/23/2020, 1:15 PM
What happens if you remove the second
handlerScope.launch
call (and just leave the
log("Handling action $action")
statement? I expect that
log
statement to execute each time someone is calling
eventFlow.sendValue(…)
during the entire lifetime of
handlerScope
. If you cancel the
viewScope
, the
eventFlow.sendValue
won’t be called anymore. This means that the
log
statement won’t get called anymore either, even though the
handlerScope
is still active.
p

pedro

07/23/2020, 1:18 PM
I wonder if this is the problem in my test
Copy code
val viewScope = CoroutineScope(newCoroutineContext(coroutineContext))
            val handlerScope = CoroutineScope(newCoroutineContext(coroutineContext))
I assume that by cancelling one scope, the other shouldn’t be cancelled. The funny thing is my original code appears to work in the real application but I can’t write unit tests to verify the behaviour
What happens if you remove the second 
handlerScope.launch
The problem is still there. With my particular flows, I see even less events in the logs because the handler takes longer to process so it misses more actions than by not using the inner launch
s

streetsofboston

07/23/2020, 1:19 PM
The seem to share a coroutineContext… if that one has a
Job
, they share that
Job
And cancelling one then cancels the other
p

pedro

07/23/2020, 1:21 PM
🤦
yes, I just got to the same conclusion except through adding logs this part of coroutines gets me confused 😵 thanks for the help. now I need to find a way to create two independent scopes
s

streetsofboston

07/23/2020, 1:25 PM
Just do a
= CoroutineScope(myDispatcher)
for each one of them.
As long as they have their own Job.
= CoroutineContext(Job() + myDispatcher)
or
= CoroutineContext(SupervisorJob() + myDispatcher)
p

pedro

07/23/2020, 1:34 PM
CoroutineScope(TestCoroutineDispatcher())
seems to solve my problem of killing both scopes. Thanks for that, that was my actual problem! The rest of the code is still not working but now I have more to explore! Thanks
🎉 1
3 Views