Naveen Perumal
07/05/2024, 9:56 AMcoroutineScope {
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}
val stateFlowScope = CoroutineScope(Job() + Dispatchers.Default)
val stateFlow = flow
.onCompletion { println("State flow completed") }
.stateIn(stateFlowScope)
val flowCollectorJob = launch {
stateFlow
.onCompletion { "Collector completed" }
.collect { value ->
println("StateFlow value: $value")
}
}
delay(2000)
println("Cancelling scope...")
stateFlowScope.cancel()
flowCollectorJob.join()
println("-------COMPLETED-------")
}
It produces the following output
StateFlow value: 1
StateFlow value: 2
Cancelling scope...
State flow completed
The collector is not completed. Why is that and is there any way to stop the collector when the flow completes?ross_a
07/05/2024, 10:14 AMross_a
07/08/2024, 8:42 AMval flow = flowOf<String>()
sealed interface MaterializedEvent<out T> {
@JvmInline
value class Update<T>(val t: T) : MaterializedEvent<T>
@JvmInline
value class Completed(val throwable: Throwable?) : MaterializedEvent<Nothing>
}
fun <T> Flow<T>.stateInMaterialized(scope: CoroutineScope): StateFlow<MaterializedEvent<T>> {
val state = MutableStateFlow<MaterializedEvent<T>?>(null)
this.onEach { state.value = Update(it) }
.onCompletion { state.value = Completed(it) }
.launchIn(scope)
// or only complete if the scope does
// scope.coroutineContext[Job]?.invokeOnCompletion { state.value = Completed(it) }
return state.drop(1) as StateFlow<MaterializedEvent<T>>
}
fun <T> Flow<MaterializedEvent<T>>.dematerialize(): Flow<T> =
transformWhile { valueOrCompletion ->
when (valueOrCompletion) {
is Update -> {
emit(valueOrCompletion.t)
true
}
is Completed -> {
valueOrCompletion.throwable?.let { throw it }
false
}
}
}
Naveen Perumal
07/08/2024, 9:49 AMstate.drop(1).stateIn(scope)
should be used instead of state.drop(1)
.
And I'm curious to know if there is any specific reason as to why the collectors aren't cancelled automatically when the scope of the flow itself is cancelled.ross_a
07/08/2024, 10:07 AMross_a
07/08/2024, 10:08 AMNaveen Perumal
07/08/2024, 10:11 AMross_a
07/08/2024, 10:12 AMfun <T> Flow<T>.stateInMaterialized(scope: CoroutineScope, initialValue: T): StateFlow<MaterializedEvent<T>> {
val state = MutableStateFlow<MaterializedEvent<T>>(Update(initialValue))
this.onEach { state.value = Update(it) }
.onCompletion { state.value = Completed(it) }
.launchIn(scope)
// or only complete if the scope does
// scope.coroutineContext[Job]?.invokeOnCompletion { state.value = Completed(it) }
return state
}
Probably makes more sense to do this then, and create a suspending version if you don't want the initial valueNaveen Perumal
07/08/2024, 10:20 AMstate.drop(1).stateIn(scope)
should be used or else the collector won't complete since state
isn't scoped to the provided scope
.ross_a
07/08/2024, 10:21 AMross_a
07/08/2024, 10:23 AMNaveen Perumal
07/08/2024, 10:28 AMval stateFlow = flow
.onCompletion { println("State flow completed") }
.stateInMaterialized(stateFlowScope)
The collectors of stateFlow seem to be collecting forever whenever scope
is cancelled if state
is returned directly. Whereas the collectors seem to be completed on cancellation of scope
if state.drop(1).stateIn(scope)
is returned.ross_a
07/08/2024, 10:30 AMross_a
07/08/2024, 10:31 AMval stateFlow = flow
.onCompletion { println("State flow completed") }
.stateInMaterialized(stateFlowScope)
val collector = stateFlow.dematerialize().collect {}
Naveen Perumal
07/08/2024, 10:32 AM