I have a `MutableStateFlow` and a `SharedFlow` wit...
# coroutines
r
I have a
MutableStateFlow
and a
SharedFlow
with this pattern:
Copy code
private val state = MutableStateFlow<SomeState>(Initial)
  val stateChanges: SharedFlow<SomeState> = state.shareIn(scope, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
however, when subscribing to
stateChanges
, it is not deterministic on whether the
Initial
state is present or not, because the
SharedFlow
is starting in the background and we have a race. This race doesn't matter to my production code, but my tests fail because of the indeterminism. Is there a way to make this deterministic for tests? I am using the coroutines-test library.
s
State transitions should not have memory, ie. how you wound up in a follow-up state should not matter to the UI showing that state.
r
This isn't UI code
Its backend code that should follow a specific set of state transitions
s
Even for non-UI stuff... sorry. An next-state should not worry about what the previous-state was. 🙂 But if you want to see if the initial-state is shown at some point in a test, write a test that just does that, by not emitting an event that moves it to the folow up state
In other words, write a test that just tests one state, not two states.
d
I'm curious why you would need an Eagerly started shared flow, when you already have a state flow.
☝️ 1
r
Ignore that, I was playing around trying to solve the race
The code transitions through a bunch of states internally based on a single function call. I'm just trying to validate that it does the right things.
d
According to the docs for Flow.shareIn:
The
shareIn
operator is useful in situations when there is a cold flow that is expensive to create and/or to maintain, but there are multiple subscribers that need to collect its values
MutableStateFlow is not a cold flow.
Without understanding your use-case better, it sounds like you're potentially using the wrong tools for the job here.
r
Its possible. The use case is a class that handles a network protocol. As messages are sent and received, the state changes. The state transitions are somewhat complex, and a MutableStateFlow works well in terms of the API I need and concurrent updates.
d
Are there multiple listeners?
r
There will likely be, yes
d
Does every listener need to receive every state transition?
r
Nope
But the state transitions that the system should go through are deterministic, given the same set of responses from the remote end
d
In that case, it might make more sense just to have a
MutableList<suspend (State)->Unit>
for the listeners, and manually call them all on transition.
r
Yeah that was actually the API I started with, and it worked, but using the SharedFlow was so much nicer to read.
d
Maybe, but it doesn't work the same way, so you're not going to get the behavior you want.
Also, as far as I know MutableStateFlow and SharedStateFlow aren't necessarily going to be deterministic,. MutableStateFlow is allowed to swallow multiple transitions and emit only the latest value,. SharedStateFlow doesn't guarantee the ordering the collectors are called.
To put it another way, dispatching with the list, you know that all the currently registered listeners will receive the update before the dispatch completes. You do NOT have that guarantee with flows
r
Yeah, like I said, for the production code, all of that is totally fine -- its just the unit test that borks. Maybe I'm just testing the wrong thing, and shouldn't care about the state transitions in the test.
d
If the test is broke, it's possible that your production code will be broke in non-obvious ways.
r
True
d
BTW, which protocol are you implementing? I had fun working on a Telnet implementation a while ago.
r
Extensible Provisioning Protocol for domain registration and management
d
r
Yep
Its not super-complex, but its the first time I'm doing this with ktor-network
I've always used Netty or Mina in the past
d
It looks more like XML processing than network protocol. Don't know much about it though.
r
Look at the state diagram here: https://datatracker.ietf.org/doc/html/rfc5730#section-2. So there are all those protocol states to manage, along with the state of actually connecting, disconnecting, half-closed sockets, etc., and actually sending messages through the protocol handler while the protocol could be in any state (the diagram shows the server side, I'm implementing the client side).
The companion RFC for sending EPP over TCP is this one: https://datatracker.ietf.org/doc/html/rfc5734
👍 1
d
I see. I didn't look to closely (tbh, I'm fight off a migraine, so my thinking/researching capacity is really limited right now)
r
No worries!
d
In any case, if I were working on your project, I'd personally want to encapsulate the dispatch/receive logic with its own DSL, and use the List<Receiver> as an implementation.
That's just my personal style though.
r
That sounds intriguing. Do you mean encapsulate all the protocol stuff into a separate piece of logic with a DSL that can be tested without any actual networking going on? Or what is
Receiver
there?
d
Receiver was my shorthand for your state-change listeners.
typealias Receiver = suspend (State)->Unit
You like the State flow API, so I'm basically proposing that you create your own that has the same niceties that you like, but is implemented in this deterministic way.
r
The states are pretty encapsulated in the protocol component. I don't really need anyone outside it to see them. The way I've currently got it is that the protocol component represents a TCP connection along with its EPP+TCP state, and it pulls messages from a Channel when its ready to send them.
But with networking stuff, lots of things can happen at any time that changes the states, so it has to be thread-safe, and I have to be able to take various actions based on states changes (like timeouts or closing the connection or opening the connection).
d
One benefit of suspend functions is that you can replace state-machines with normal imperative logic.
The compiler converts suspend functions into state-machines behind the scenes.
It's a different way to approach it.
r
Yeah, I was thinking about that -- the issue I ran into was that I have two coroutines operating in the same connection -- one reader and one writer (plus the coroutine that starts connections, and shuts them down), so I've got this shared mutable state to deal with.
Unless I create a third coroutine that does only imperative state management, and talks to the other two via channels
d
Definitely want to break down responsibilities.
Maybe you want a Command design pattern?
Well, I'm at my limit of what advice I can give (and I realize it was mostly unsolicited anyway). Hopefully its been helpful to you, or at least thought provoking 😉
d
Yeah, something like that.
r
Yeah definitely something worth thinking about
Thank you, definitely useful to bounce ideas around!
d
For my Telnet implementation, I did have separate coroutines for read and for write, and then a shared state in the OptionsManager. It definitely took some careful consideration on how to define the interfaces between them.
👍 1
r
Good to know
Hope you feel better soon 😷
gratitude thank you 1