Radoslaw Juszczyk
06/23/2023, 7:18 AMsomeFlow() // emits [a, b, c, ... ]
.onStart {
CoroutineScope(coroutineContext).launch {
var c = 0
//do stuff
while(true) {
delay(500)
c++
Timber.d("c = $c")
}
}
}
I want to have a flow with a side effect, so something is done independently while the flow is collected but it is stopped when the flow is no longer collected
so I want to have logs like that:
val job = viewModelScope.launch {
someFlow().collectLatest { Timber.d("someFlow $it") }
}
//now logs should be:
c = 1
c = 2
c = 3
someFlow a
c = 4
c = 5
someFlow b
c = 6
someFlow c
c = 7
job.cancel()
//logs stop and I do not get c = 8, c = 9 anymore (internal job canceled)
Sam
06/23/2023, 7:44 AMCoroutineScope
isn’t a great solution. The flow
doesn’t have its own Job
, so your new scope becomes a child of whatever job collects the flow. That works in your example because you’re creating a new job specifically to collect that flow, but a flow can just as easily be collected inline as part of an existing coroutine. At that point it would become very hard to cancel or keep track of the side-effect job.Sam
06/23/2023, 7:45 AMcoroutineScope
builder function to make an inner scope that is tied to the lifecycle of the flow. That way it’s fully encapsulated and the coroutines it creates can never leak outside the flow. Here’s my approach: https://pl.kotl.in/e0yf0JDe_Sam
06/23/2023, 7:46 AMfun <T> Flow<T>.withSideEffect() = flow {
coroutineScope {
val sideEffect = launch {
var c = 0
//do stuff
while (true) {
delay(500)
c++
println("c = $c")
}
}
onCompletion {
sideEffect.cancel()
}.collect {
emit(it)
}
}
}
Radoslaw Juszczyk
06/23/2023, 8:09 AMSam
06/23/2023, 8:12 AMcoroutineScope
is encapsulated inside the flow
so any coroutines it launches will always terminate when the downstream flow collector stops collecting the flow. That includes if the collector coroutine is cancelled.
2. The onCompletion
block explicitly cancels the sideEffect
job any time the upstream flow terminates (normally or exceptionally)Radoslaw Juszczyk
06/23/2023, 8:41 AMFlow
was actually a StateFlow
so it never completes.
Btw: is there any difference between using corotineScope
vs CoroutineScope(coroutineContext).launch *{*
(for my usecase), except that coroutineScope
seems more elegant?Sam
06/23/2023, 9:10 AMcoroutineScope
is a suspending function that waits for all the coroutines in its scope to complete. CoroutineScope(coroutineContext)
creates a new scope but returns immediately. That means it breaks encapsulation and structured concurrency, because it lets you launch child coroutines that outlive the function (or flow) where they were created.
In general, you should always avoid calling CoroutineScope(coroutineContext)
as it introduces a risk of leaking coroutines outside of their intended lifecycle.
In your original example, coroutineScope
would actually not have worked as a replacement for CoroutineScope(coroutineContext)
inside the onStart
. It would delay the start of the flow indefinitely because it wouldn’t return until the side effect job was complete. But that’s because onStart
is not the intended lifecycle of the side effect. The side effect is intended to run for the duration of the flow’s collection, which is why it makes sense to launch it inside the collect
block.Radoslaw Juszczyk
06/23/2023, 9:58 AMCoroutineScope
uses parents corotuneContext
?Sam
06/23/2023, 10:00 AMcoroutineContext
, since the new scope uses that context directly. What I meant was that it can outlive the current function or flow, which in itself can be a problem. An important rule of structured concurrency is that we can guarantee that when calling a flow or suspend function, the function or flow only returns/terminates once all of the coroutines it launched have also terminated.Radoslaw Juszczyk
06/23/2023, 10:01 AMRadoslaw Juszczyk
06/23/2023, 10:03 AMcoroutineScope
cannot outlive the current function since it suspends itSam
06/23/2023, 10:03 AMsuspend fun bad() {
CoroutineScope(currentCoroutineContext()).launch {
delay(500)
println("Still running!")
}
}
It’s bad because it’s very confusing for anyone calling that function — the function appears to return immediately, but actually it’s still doing stuffRadoslaw Juszczyk
06/23/2023, 10:08 AMRadoslaw Juszczyk
06/23/2023, 10:12 AMcoroutineScope {
val sideEffect = launch {
var c = 0
//do stuff
while (true) {
delay(500)
c++
println("c = $c")
}
}
onCompletion {
sideEffect.cancel()
}.collect {
emit(it)
}
}
do we need the onCompletion { sideEffect.cancel() }
then?
if collection ends then coroutineScope exits and its child jobs cancel, no?Sam
06/23/2023, 10:14 AMonCompletion
block to handle the case where the upstream flow is terminated. You said your upstream flow is a stateflow that never terminates, so in your case it’s not necessary.Sam
06/23/2023, 10:15 AMRadoslaw Juszczyk
06/23/2023, 10:17 AMcoroutineScope
ends too, doesnt it cancel its child jobs anyway?Sam
06/23/2023, 10:22 AMcoroutineScope
reaches the end of its lambda block it will actually just wait for all of its child jobs to complete. It doesn’t cancel them.Radoslaw Juszczyk
06/23/2023, 10:23 AM