https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

allan.conda

11/30/2020, 5:46 PM
What’s the safest way to apply a reduce function to a MutableStateFlow?
I encountered this issue:
Copy code
data class SomeState() {
    private val someProperty: Boolean = false,
    private val anotherProperty: Boolean = false
}

class MyViewModel(
    private val someSuspendedUseCase: SomeSuspendedUseCase
) {

    private val _state = MutableStateFlow(SomeState())

    init {
        viewModelScope.launch {
            _state.value = _state.value.copy(
                someProperty = someSuspendedUseCase.fetchSomething() // takes 2 seconds and returns true
            )
        }
    }

    // called sometime while coroutine above is running
    fun someMethod() {
        viewModelScope.launch {
            _state.value = _state.value.copy(
               anotherProperty = true
            )
        }
    }

}
Then the resulting state:
Copy code
// Actual
State(someProperty = true, anotherProperty = false)
// What I want
State(someProperty = true, anotherProperty = true)
I want to make sure the state being reduced is the latest state. In this case there is a race condition. I could make sure to await the use case result before copying and updating the state, but I’m still not completely sure this is safe if the state is being updated from different Coroutines.
Example is a bit specific to Android, but I think this is applicable in general for those who are trying to utilize StateFlows to implement redux-like Reducers. I found the approach of possibly using combine and then making sure each property is also a MutableStateFlow, but it results to some boilerplate, plus I’m still not confident that it is safe anyway.
w

wasyl

11/30/2020, 5:50 PM
You can either introduce a
Mutex
and use
withLock { }
to disallow concurrent access , or use
actor
api to send desired updates (e.g. in form of lambdas) which the actor will execute sequentially
Btw instead of
Copy code
_state.value = _state.value.copy(
  someProperty = someSuspendedUseCase.fetchSomething()
)
you can write
Copy code
val newValue = someSuspendedUseCase.fetchSomething()
_state.value = _state.value.copy(someProperty = newValue)
where it’s a bit easier to then isolate the non-atomic part, like
Copy code
val newValue = ...
stateMutex.withLock { 
  _state.value = _state.value.copy(...)
}
a

allan.conda

11/30/2020, 6:00 PM
Nice, that should work I think. About newValue though, it’s possible that it can also be a sub-state that I want to reduce and not just a simple primitive, so the problem will still exist at that level. 🤔 I think we need to ensure that the whole state does not get updated while I’m trying to reduce the state. Would putting newValue inside the mutex lock work? Sorry If this is is not making any sense, been a long time since I had to worry about synchronization 😅
Or perhaps your
actor
suggestion would work better here
in such that only one reduce operation happens at a time
although that would limit options if I want to optimize, such as triggering an async operation as soon as possible instead of being blocked by another action
w

wasyl

11/30/2020, 6:02 PM
Would putting newValue inside the mutex lock work?
yes
Or perhaps your 
actor
  suggestion would work better here
Yes, I think it’s a bit better solution. Requires a bit more boilerplate and I assume involves some overhead (there are some channels involved) but makes it clearer that you only intend to modify the state synchronously
a

allan.conda

11/30/2020, 6:03 PM
Or perhaps I should just put all a
copy
call inside the mutex lock and everything else is outside
w

wasyl

11/30/2020, 6:03 PM
triggering an async operation as soon as possible instead of being blocked by another action
You can do that, enqueue a started
Async
and in the actor, when it’s that action’s turn,
await()
the result
a

allan.conda

11/30/2020, 6:04 PM
I see. Looks like that could work as well.
w

wasyl

11/30/2020, 6:04 PM
Sorry, I think I meant a started
Deferred
, I don’t recall the name. The one started with
async { }
block
a

allan.conda

11/30/2020, 6:04 PM
yes async would return Deferred.
I think mutexLock could work in my case, let me have a go at it. I didn’t expect to get a satisfying answer so soon, thanks for your help!
👍 1
w

wasyl

11/30/2020, 6:06 PM
Or perhaps I should just put all a 
copy
 call inside the mutex lock and everything else is outside
I do that sometimes, it’s just easier. But it’s also easier to forget to wrap state change in a mutex.
🙏 1
a

allan.conda

11/30/2020, 6:15 PM
Weirdly, I have not considered this possibility in a big project and it’s full of reducer code like this, and an issue like this has never been apparent. Then I just started on a fresh project and this issue has been occurring way too often. I’ll have to review all related code on the big project 🤦 Do you wrap state change in a lock always? It can be difficult to detect issues like this otherwise, I think.
w

wasyl

11/30/2020, 6:24 PM
I think I do, but I don’t have any automated way of verifying that 😕 I try hard not to have a state, and whenever I do, to modify it only from the same class (and expose Flows). Then it’s a bit easier to notice lack of synchronisation, but things can slip through regardless
a

allan.conda

11/30/2020, 6:58 PM
I think I do, but I don’t have any automated way of verifying that
I see. Just a wild idea, have you considered making an extension function, like
Copy code
suspend fun <T> MutableStateFlow<T>.reduce(mutex: Mutex? = null, reducer: (T) -> T) {
        value = mutex?.withLock {
            reducer(value)
        }?: reducer(value)
    }

// usage
    _state.reduce(mutex) {
        it.copy(...)
     }
I try hard not to have a state, and whenever I do, to modify it only from the same class (and expose Flows).
I only modify it from a single class (ViewModel) and expose a public StateFlow, although it’s pretty much the bread-and-butter for our MVVM so we use State for all Databinding cases
w

wasyl

11/30/2020, 7:04 PM
I only modify it from a single class (ViewModel)
Our view models only pass data from use cases, and as such only have a state that’s never mutated based on its previous value. That is, we only ever do
liveData.value = useCaseResult
, or rather
useCase.collect { livedata.value = it }
. So we don’t even have this complexity, because collection only happens on the main thread. In the use cases, most of the synchronisation is also automatic because we mostly combine flows. Only when something happens that modifies domain internal state, we need this synchronisation (and then often libraries do this for us). So in the end we mostly have couple of in-memory storages which should keep and update the state like this. Initially it wasn’t clear how many different patterns we’ll see, but now I’m just considering creating a single base class for that. Your reducer makes sense though 🙂
a

allan.conda

11/30/2020, 7:26 PM
Thank you. Your approach is really interesting 🙂
w

wasyl

11/30/2020, 9:58 PM
Thanks 🙂
4 Views