Hi all, Let's say I have a state flow created usin...
# coroutines
n
Hi all, Let's say I have a state flow created using stateIn(scope). When scope is cancelled, the flow will be completed and no more emissions can happen. In that case why are the collectors not cancelled? Consider the following snippet,
Copy code
coroutineScope {
        val flow = flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
            delay(1000)
            emit(4)
        }

        val stateFlowScope = CoroutineScope(Job() + Dispatchers.Default)
        val stateFlow = flow
            .onCompletion { println("State flow completed") }
            .stateIn(stateFlowScope)

        val flowCollectorJob = launch {
            stateFlow
                .onCompletion { "Collector completed" }
                .collect { value ->
                    println("StateFlow value: $value")
                }
        }

        delay(2000)
        println("Cancelling scope...")
        stateFlowScope.cancel()
        flowCollectorJob.join()
        println("-------COMPLETED-------")
    }
It produces the following output
Copy code
StateFlow value: 1
StateFlow value: 2
Cancelling scope...
State flow completed
The collector is not completed. Why is that and is there any way to stop the collector when the flow completes?
r
There isn't unfortunately StateFlow is a specialisation of SharedFlow and per the docs "Shared flow never completes", it's tricky because I too wish it did.
👍 1
Thought a bit more and this might help:
Copy code
val flow = flowOf<String>()

sealed interface MaterializedEvent<out T> {
    @JvmInline
    value class Update<T>(val t: T) : MaterializedEvent<T>

    @JvmInline
    value class Completed(val throwable: Throwable?) : MaterializedEvent<Nothing>
}

fun <T> Flow<T>.stateInMaterialized(scope: CoroutineScope): StateFlow<MaterializedEvent<T>> {
    val state = MutableStateFlow<MaterializedEvent<T>?>(null)

    this.onEach { state.value = Update(it) }
        .onCompletion { state.value = Completed(it) }
        .launchIn(scope)

    // or only complete if the scope does
//    scope.coroutineContext[Job]?.invokeOnCompletion { state.value = Completed(it) }

    return state.drop(1) as StateFlow<MaterializedEvent<T>>
}

fun <T> Flow<MaterializedEvent<T>>.dematerialize(): Flow<T> =
    transformWhile { valueOrCompletion ->
        when (valueOrCompletion) {
            is Update -> {
                emit(valueOrCompletion.t)
                true
            }
            is Completed -> {
                valueOrCompletion.throwable?.let { throw it }
                false
            }
        }
    }
n
Thanks for the reply. This solution works as expected. Just a small correction.
state.drop(1).stateIn(scope)
should be used instead of
state.drop(1)
. And I'm curious to know if there is any specific reason as to why the collectors aren't cancelled automatically when the scope of the flow itself is cancelled.
r
No problem, I haven't actually tested it yet to find out the unexpected so I don't know I'm afraid
ah yes it's not a stateflow after the drop 😕
n
ok. thanks
r
Copy code
fun <T> Flow<T>.stateInMaterialized(scope: CoroutineScope, initialValue: T): StateFlow<MaterializedEvent<T>> {
    val state = MutableStateFlow<MaterializedEvent<T>>(Update(initialValue))

    this.onEach { state.value = Update(it) }
        .onCompletion { state.value = Completed(it) }
        .launchIn(scope)

    // or only complete if the scope does
//    scope.coroutineContext[Job]?.invokeOnCompletion { state.value = Completed(it) }

    return state
}
Probably makes more sense to do this then, and create a suspending version if you don't want the initial value
n
state.drop(1).stateIn(scope)
should be used or else the collector won't complete since
state
isn't scoped to the provided
scope
.
r
which collector sorry? the one emitting to the MutableStateFlow is via the launchIn
the MutableStateFlow doesn't need to be scoped, its collectors are scoped to whatever they are launched in
n
Copy code
val stateFlow = flow
            .onCompletion { println("State flow completed") }
            .stateInMaterialized(stateFlowScope)
The collectors of stateFlow seem to be collecting forever whenever
scope
is cancelled if
state
is returned directly. Whereas the collectors seem to be completed on cancellation of
scope
if
state.drop(1).stateIn(scope)
is returned.
r
you need to dematerialize it for the cancellation to be unwrapped and actually occur
Copy code
val stateFlow = flow
            .onCompletion { println("State flow completed") }
            .stateInMaterialized(stateFlowScope)

val collector = stateFlow.dematerialize().collect {}
n
sorry. my bad. Didn't check correctly. It works as expected