Is it possible to create a `StateFlow<T>` th...
# coroutines
d
Is it possible to create a
StateFlow<T>
that doesn't go on forever? I'm thinking of having something like a
MutableStateFlow
that allows the emission of a done (or even an error) that indicates the otherwise hot flow will no longer emit values.
z
MutableStateFlow will never complete itself, there’s no way to make it. You can use operators to terminate the flow downstream. You could also make your own impl of StateFlow but that’s risky since last I checked its docs say it’s not stable for external implementators.
☝️ 1
d
I was hoping I could do something like combine the flow with a completion signal to produce a flow that completes
Once again sorry if these are basic questions... I'd know how to do this in RX for .NET for example, but don't know how to do this with Kotlin flows
z
Yea that’s possible - but you won’t get a StateFlow just a plain Flow
☝️ 1
k
Copy code
sealed interface SomeState {
  data object Complete : SomeState
  data class Incomplete(val value: Int) : SomeState
}

val stateFlow = MutableStateFlow(Incomplete(0))

stateFlow.takeWhile { it != SomeState.Complete }.collect { ... }
thank you color 1
d
Thanks. That makes a lot of sense. Is there a safe way I can also expose the last state as a value in a wrapping class, that is thread/coroutine safe? (perhaps I'm being overly cautious).
k
What do you mean last state?
d
I mean since the
StateFlow
always has a value I'm thinking how I can safely return the last value, in your example its the last Int value
k
The final collected element after the
takeWhile
completes would be the latest incomplete state.
I’m not sure I’m understanding your question, though.
d
Copy code
// Core type aliases for clarity and flexibility
typealias ModelTransformation<M> = (M) -> M
typealias IntentHandler<M, I> = (I) -> ModelTransformation<M>
typealias ViewProjection<M, V> = (M) -> V

// Model interface representing the state container
interface Model<M> {
    val state: StateFlow<M>
    val value:M = ???
    
    fun update(transformation: ModelTransformation<M>)
    
    companion object {
        fun <M> create(initial: M, updates: Flow<ModelTransformation<M>>): Model<M> = 
            FlowBasedModel(initial, updates)
    }
}
So I'm doing something like above and I want to be able to internally modify
value
in a way that doesn't lead to races... the most straightforward way with a
StateFlow
is to use its value, but given the above strategy the last "completed" value has no value so it seems I'd need to make
value
internally mutable, but that seems like it would expose me to races possibly
z
Why do you have a separate value property? That’s just an unnecessary second source of truth
☝️ 1
k
If you’re updating the underlying value of a stateflow, you can just use
StateFlow.update { old -> new }
It also seems like this shouldn’t be an interface, but an implementation of an interface
d
I do have an underlying implementation for this interface.
The value is there since I always want to be able to get the true value. If I have:
Copy code
sealed interface ValueOrDone<T> {
  data class Value<T>(val value:T):ValueOrDone<T>
  data object Done:ValueOrDone<Nothing>
}
I want to be sure I can get the last
T
after a Done
k
Copy code
interface Model<M> {
  
  fun update(block: (old: M) -> M)

  fun markComplete()

  fun flow(): Flow<M>
}



class StateFlowModel<M>(initialValue: M) {

  private val sf = MutableStateFlow<SomeState<M>>(initialValue)

  override fun update(block: (old: M) -> M) {
    sf.update(block)
  }

  override fun markComplete() {
    sf.value = SomeState.Complete
  }

  override fun flow(): Flow<M> = sf.takeWhile { it != SomeState.Complete }

}
☝🏻 1
d
Ahh ok
Dang... I had 98% of this before
Cool cool. Thanks guys
Hmm if I try and take the first or last element of the flow it fails with NoSuchElement... wonder what I am missing
k
Can you be more specific?
d
Perhaps this helps: https://pl.kotl.in/6Xc3hIYCJ
Hmmm... this shows me trying some extra stuff to get around the error I got with a solution closer to what you had
The above shows the issue I am facing
More to the point:
Copy code
override val state: Flow<M> = _state
        .takeWhile { it !is CompletableState.Complete }
        .map { state -> (state as CompletableState.Incomplete<M>).value }
ends up returning null if I do:
Copy code
model.state.lastOrNull()
Which shouldn't be the case since I'm giving it a non-null initial value
k
It’s because you’re marking the model as complete before the launch get’s a chance to run. Either use
CoroutineStart.UNDISPATCHED
or toss a
yield
in before the call markComplete
d
If I do this it hangs:
Copy code
fun main() {
    runBlocking {
        val model = Model.create("initial state")
        launch {
    		val currentState = model.state.lastOrNull()
            println("currentState: $currentState")
            check(currentState != null)
    	
        	model.markComplete()        
        }
        
    }
}
k
It's because
lastOrNull
will never complete until you can concurrently mark the model as done
Copy code
fun main() {
    runBlocking {
        val model = Model.create("initial state")
        launch {
    		val currentState = model.state.lastOrNull()
            println("currentState: $currentState")
            check(currentState != null)

        }
        yield()
        model.markComplete()        
    }
}
Works fine
I also bet that this would work fine
Copy code
fun main() {
    runBlocking {
        val model = Model.create("initial state")
        launch(UNDISPATCHED) {
    		val currentState = model.state.lastOrNull()
            println("currentState: $currentState")
            check(currentState != null)
        }
        model.markComplete()      
    }
}
d
okay I see . What does UNDISPATCHED do... I can look it up but maybe you have a quick summary... is that a special dispatcher?
Thanks again
k
It will synchronously run the child coroutine until its first suspension point without going through a
CoroutineDispatcher.dispatch
cycle, which in contrast queues a task to run and will not run its contents synchronously
d
Thanks again