pedro
07/23/2020, 12:20 PMviewScope.launch {
viewActions
.buffer()
// .flowOn(viewScope) // tried this while replacing the first viewScope.launch with handleScope.launch - did not work
.collect { action ->
log("Handling action $action")
handlerScope.launch { // dit not work
// launch(handlerScope.coroutineContext) { // did not work
// withContext(handlerScope.coroutineContext) { // did not work (together with the next launch)
// launch {
handler.handle(action)
.collect {
// do something with mutation
log("Mutation $it")
counter++
}
// }
// }
}
// }
}
}
streetsofboston
07/23/2020, 12:28 PMStateFlow
or an EventFlow
for that.
We rolled our own StateFlow
and EventFlow
, and they are `Flow`s that are backed by a ConflatedBroadcastChannel
(for StateFlow) or a plain rendezvous Channel
(for EventFlow).pedro
07/23/2020, 12:41 PMstreetsofboston
07/23/2020, 12:47 PMclass EventFlow<T> private constructor(
private val channel: Channel<T>
): Flow<T> by channel.receiveAsFlow() {
suspend fun sendValue(value: T) {
channel.send(value)
}
companion object {
operator fun <T> invoke(): EventFlow<T> = EventFlow(Channel())
}
}
(note; receiveAsFlow
produces a Flow that can only have one collector/observer. you’d need to to a bit more code to allow for more than one collector)pedro
07/23/2020, 12:51 PMstreetsofboston
07/23/2020, 12:53 PMsend
will suspend until a collector collects the sent value. This is done through a rendezvous Channel
inside the EventFlow
.pedro
07/23/2020, 1:01 PMval eventFlow = EventFlow<Action>()
viewScope.launch {
viewActions
.buffer()
.onEach {
// log("sending action $it")
eventFlow.sendValue(it)
}
.launchIn(viewScope)
}
handlerScope.launch {
eventFlow
.collect { action ->
log("Handling action $action")
handlerScope.launch {
handler.handle(action)
.collect {
// do something with mutation
log("Mutation $it")
counter++
}
}
}
}
streetsofboston
07/23/2020, 1:06 PMviewScope
is the scope of the producer.
The handleScope
is the scope of the consumer.
Cancelling the viewScope will just stop sending values. It won’t cancel the handleScope, but the call to eventFlow.collect
will not do anything, since there is no-one sending any values….handler.handle(action)
returns…. could that be a Flow that gets cancelled too soon…?pedro
07/23/2020, 1:08 PMFlow<Mutation>
but the call toI was (at least trying) to send the events from the original flow into this event flow. and the handler scope is collecting thiswill not do anything, since there is no-one sending any values….eventFlow.collect
streetsofboston
07/23/2020, 1:15 PMhandlerScope.launch
call (and just leave the log("Handling action $action")
statement?
I expect that log
statement to execute each time someone is calling eventFlow.sendValue(…)
during the entire lifetime of handlerScope
.
If you cancel the viewScope
, the eventFlow.sendValue
won’t be called anymore. This means that the log
statement won’t get called anymore either, even though the handlerScope
is still active.pedro
07/23/2020, 1:18 PMval viewScope = CoroutineScope(newCoroutineContext(coroutineContext))
val handlerScope = CoroutineScope(newCoroutineContext(coroutineContext))
I assume that by cancelling one scope, the other shouldn’t be cancelled.
The funny thing is my original code appears to work in the real application but I can’t write unit tests to verify the behaviourWhat happens if you remove the secondThe problem is still there. With my particular flows, I see even less events in the logs because the handler takes longer to process so it misses more actions than by not using the inner launchhandlerScope.launch
streetsofboston
07/23/2020, 1:19 PMJob
, they share that Job
And cancelling one then cancels the otherpedro
07/23/2020, 1:21 PMstreetsofboston
07/23/2020, 1:25 PM= CoroutineScope(myDispatcher)
for each one of them.= CoroutineContext(Job() + myDispatcher)
or = CoroutineContext(SupervisorJob() + myDispatcher)
pedro
07/23/2020, 1:34 PMCoroutineScope(TestCoroutineDispatcher())
seems to solve my problem of killing both scopes. Thanks for that, that was my actual problem!
The rest of the code is still not working but now I have more to explore!
Thanks