https://kotlinlang.org logo
#flow
Title
# flow
p

parth

10/13/2021, 2:44 PM
hey all — i have a pretty common construction where i’m in an Android ViewModel and need to observe a Flow, do some stuff, and then push the result into a 
MutableStateFlow
. Looks something like this:
Copy code
fun whatever() { 
	viewModelScope.launch { 
		inputFlow
			.map {...}
			.collect { value -> 
				mutableStateFlow.value = value
			}
	}	
}
(Assume that I need to push other values into the 
MSF
 outside this 
collect
 block) One issue I’ve run into with the above construction is that when the observer of the
MSF
cancels collection (e.g. the Fragment observing the ViewModel falls off the screen), it doesn’t cancel this intermediate collection…this makes absolute sense — that cancellation signal is not being propagated upstream! So digging into the
stateIn
sources, I’ve come up with what I think is a way to push the cancellation “upstream” — can y’all give thoughts/comments/suggestions on the following approach?
Copy code
fun <T> Flow<T>.stateInto(downstream: MutableStateFlow<T>, scope: CoroutineScope) {
    val upstream = this
    scope.launch {
        downstream.subscriptionCount
            .mapLatest { count -> //this is a simplified version of [StartedWhileSubscribed#command]
                if (count > 0) true
                else {
                    delay(800.milliseconds)
                    false
                }
            }
            .dropWhile { active -> !active }
            .distinctUntilChanged()
            .collectLatest { active ->
                when (active) {
                    true -> upstream.collect(downstream) //will be cancelled upon new emission
                    false -> Unit /*just cancel and do nothing*/ 
                }
            }
    }
}
d

darkmoon_uk

10/15/2021, 1:36 AM
Personally, in these situations I would always try to think in the reactive paradigm first. That's a pretty abstract statement so let's break it down: By collecting one flow and mutating another, you're kind of creating this imperative 'break' in what could otherwise be one continuous Flow.
The Flow API's we have are really nice and un-opinionated; but they're also inspired by Rx which strongly advocates for a functional (no side effects) approach to things.
So bringing it back down to your example, I'd reconsider what your
mutableStateFlow
is.
Its some value that changes based on a number of inputs.
What are those inputs, think in those terms, enumerate them, where do you call
.value =
?
At least one of them is your
inputFlow
and you want to do some map to that.
Let's aim to make your
mutableStateFlow
no longer mutable, but instead an immutable
outputFlow
then you start with:
Copy code
val outputFlow: Flow<SomeType> = inputFlow.map { ... } // What about the other inputs?
So maybe other things were mutating your flow too. The best way to bring these in depends on their nature, you probably want to either
merge
or
combine
these together.
Without feedback from you I'll stop there; possibly none of the above is new, but I find it very common for devs new to Flows and Rx to use them in this pseudo imperative style; when it can all be much more elegant (and less error prone!) when you think reactively/functionally.
p

parth

10/15/2021, 1:58 AM
yeah I can’t combine/merge them in because they’re not Flows, they’re single methods that are driven by user interaction that push new state into the MutableStateFlow. (Also FWIW I’ve been doing Rx since 2015, so I’m familiar with the notion of “no side effects! pure functions only!” but that’s not always realistic in the real world 😕 ) oh yeah btw if you find the proposed code a bit odd, it’s actually distilled (and structurally the same) from
Flow.stateIn(...)
d

darkmoon_uk

10/15/2021, 2:21 AM
Ah yeah fair enough 👍 I just found this is too often the answer to peoples probs in the past ☝️
n

Nick Allen

10/19/2021, 4:04 AM
Sometimes I'll turn use a MutableState/SharedFlow to represent the method calls and then I can go full reactive. Seems like something akin to this could work for you:
Copy code
private val nonInputStateFlow = MutableStateFlow<ValueType>
val outputFlow = merge(inputFlow.map {...}, nonInputStateFlow)
fun otherMethod1(value: Value) {
    otherMethodFlow.tryEmit(value)
}
fun otherMethod2(somethingElse: SomethingElse) {
    otherMethodFlow.tryEmit(calculateValue(somethingElse))
}
2 Views