eygraber
02/22/2021, 10:33 PMMutableStateFlow
to replace @Volatile var foo: Foo = defaultFoo()
? My main reasoning is that I don't like var
and it would be easier for multiplatform etc...joakim
03/07/2021, 8:55 PMppvi
03/09/2021, 11:54 AMSharedFlow
(but it replays everything to a second observer), Channel(CONFLATED).*receiveAsFlow*
but only one observer gets updates (and we discarded StateFlow
, consumeAsFlow()
, and .broadcast()
/cc @elizarov @Manuel VivoJason Ankers
03/16/2021, 3:49 AMprivate val workouts = flow { emit(userRepository.getWorkouts()) }
private val exercises = flow { emit(userRepository.getExercises()) }
val workoutsByExerciseId: Flow<Map<String, Workout>> = ? // do some transformation which combines both sources
I want to create workoutsByExerciseId
which should depend on both workouts
and exercises
. If either source changes, workoutsByExerciseId
should always re-emit with the latest data. Which flow operator do I need here?Jeff Lockhart
03/16/2021, 5:31 AMLiveData
to StateFlow
. What would be the equivalent flow construct for the CoroutineLiveData
builder function?
val state = liveData {
emit(loadState())
}
suspend fun loadState(): State = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
...
}
The closest I can determine is:
val state = flow {
emit(loadState())
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), null)
Except that stateIn()
requires an initial value, which doesn’t make this quite the same, as I don’t want any value collected until the state is loaded.
Another possibility might be to use the shareIn()
operator to create a SharedFlow
with replay set to 1. This might be a closer equivalent in some ways:
val state = flow {
emit(loadState())
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(5000), replay = 1)
Except SharedFlow
doesn’t provide a value
property to get the current state. Which makes the StateFlow
construct with a nullable type a closer equivalent in this regard.Marc Knaup
03/18/2021, 3:05 PMStateFlow
in a Kotlin/React project and noticed an interesting nuance that may make it unsuitable for providing state in React-like projects - and maybe others. Am I missing something or do I have an error in my thinking here?
In my case the problem crops up in the sign-in process.
To simplify, there are just two pages:
- The MembershipPage
shows information about the signed-in user. If no user is signed in, redirect to SignInPage
.
- The SignInPage
allows the user to sign in. If a user is already signed in, redirect to MembershipPage
.
A single UserController
component wraps these (and other) user-related pages, keeps track of the signed-in user and provides that value to pages, and performs the redirects if needed.
Various React components keep track of the signed-in user by accessing StateFlow.value
when first rendering the component, and collect StateFlow
emissions of new values to account for changes.
Now when the user signs in the following happens:
1. Send credentials to server.
2. Server responds with OK and user object.
3. StateFlow
value is set to signed-in user (flow.value = user
).
4. SignInPage
redirects to MembershipPage
. (user has signed in successfully)
5. UserController
redirects to SignInPage
. (not signed in)
6. SignInPage
redirects to MembershipPage
. (already signed in)
7. UserController
redirects to SignInPage
. (not signed in)
8. <repeats a few times>
9. SignInPage
redirects to MembershipPage
. (already signed in)
10. MembershipPage
is presented.
There’s a redirect cycle that gets broken automatically after a short time.
What’s happening here?
After a successful sign-in the `StateFlow`’s value
is set to the signed-in user. From that moment on flow.value
returns the signed-in user that was just set. The `StateFlow`’s emission of that new value however is not immediate and takes a while.
During steps 4 through 8 above there is an inconsistent state in the React components. Those that happen to use StateFlow.value
acknowledge that a user is signed in. Those that wait for the `StateFlow`’s emission of a new value still consider no user to be signed in. Depending on whether a component is first rendered (accessing .value
) or rerendered (value depends on .onEach/.collect
) the behavior is different.
So by using StateFlow
the system inadverently depends on two different sources of truth for the signed-in state. That’s because the return value of .value
and StateFlow
emissions of new value are not synchronized.
I’m not sure if that’s intentional behavior of StateFlow
. State should have a single source of truth and StateFlow
itself is inconsistent here.
One potential solution would be to change how StateFlow
works. For example by having .value
to not return a new value until the emission of the new value has begun. There’s still a potential for race between different contexts but at least in synchronous environments like JS that should be irrelevant.
There are two possible approaches to use StateFlow
as-is as a single source of truth:
(1) Rely on .value
only.
(2) Rely on value emissions only.
(1) doesn’t work because components need to know when the value changes in order to re-render. Otherwise the UI would get out of sync with the state.
(2) doesn’t work because components need to access the current value synchronously at the time of initial render. Otherwise the UI would always have to briefly display a loading state while waiting for the initial emission even though that value is already available synchronously.
As a compromise I could always rely on .value
for accessing the current value and use emissions only for triggering a re-render. An inconsistency still remains however. Components aren’t notified immediately when the state they depend on changes. They either render a state that’s potentially newer than the state they were last notified about, or they render a state change too late, and they may render more often than needed.
If the SignInPage
executes some logic as a result of a successful sign-in it does so under the assumption that the user is signed in now. Other components however still assume that no user is signed in. No re-render is pending for them because there’s no notification about a change in state yet. Hence the UI is temporarily inconsistent.
I don’t think that this subtle inconsistency between .value
and emissions only affects Kotlin/React projects. It may also affects other projects which rely on a mix of accessing .value
and emitted values.
If it’s intentional then it should be communicated prominently in the documentation. However for me it would defeat the purpose of a StateFlow
for managing state because it’s not a single source of truth.
I could also write a wrapper that may use a StateFlow
under the hood but synchronizes the current value with the emissions. But that’s just reinventing what I assume StateFlow
is supposed to provide in the first place.
What are your thoughts on that?Francesco Cervone
03/19/2021, 3:12 PMMutableStateFlow
and I need to know when collectors subscribe/unsubscribe to/from this flow.
I see the fun SharedFlow<T>.onSubscription
extension which invokes the callback when the flow starts to be collected but I can’t find any extension function to know when a collector cancels the subscription.
I’m basically looking for the equivalent of RxJava doOnSubscribe and doOnDispose.
Does anyone know if there is a way to know when a collector cancels its subscription?Simon Lin
03/23/2021, 1:57 AMJason Ankers
03/24/2021, 5:34 AMval uiState = combine(sharedFlow1, sharedFlow2, sharedFlow2) { … }
and I would like to catch anything which goes wrong in any of the flows. How could I achieve that? .catch
doesnt work because shared flows never completeppvi
03/24/2021, 10:33 AMrunBlockingTest
? firstOrNull()
shows IllegalStateException: This job has not completed yet
Bernardo Macêdo
03/26/2021, 4:10 PMLiveData
that represents a UI state into a StateFlow
.
My initial live data code looked like this:
private val _state = MediatorLiveData<State>().apply {
addSource(livedata2) { value2 -> value?.let { it.copy(someVariable = value2) } }
value = State(someVariable = "initial value", someOtherVariable = "another initial value")
}
val state: LiveData<State> = _state
The idea is that I have an initial value and also part of the state gets updated whenever a new value is triggered on livedata2
To reach the same behavior using flows, I modified livedata2
to be a SharedFlow (I’ll call it flow2
) and my _state
became a StateFlow.
However I have to collect
the flow2 instead of just applying an operator.
private val _state = MutableStateFlow(
State(
someVariable = "initial value",
someOtherVariable = "another intial value"
)
).apply {
viewModelScope.launch {
flow2.collect { value2 -> emit(value.copy(someVariable = value2)) }
}
}
val state: StateFlow<State> = _state
While it does work, I feel like this is probably not the way to go? Maybe there’s an operator for that. (I tried to use _state.combine(flow2)
but my state was not being updated for new events on flow2
)
I there a better way to achieve the same result?Sanendak
04/07/2021, 5:31 PMTiago Nunes
04/08/2021, 4:35 PMprivate val refreshJob = filterFlow
.onEach { prev, new ->
if(prev.x != new.x) {
updateX()
}
}
.launchIn(viewModelScope)
I know I can separate the filterFlow in multiple flows, which is probably the better way of solving my use case, but I'm still curiousJeziel Lago
04/13/2021, 2:46 PMMutableSharedFlow
?
@Test
fun testIntSharedFlow() = runBlockingTest {
val intSharedFlow = MutableSharedFlow<Int>()
intSharedFlow.emit(5)
assertEquals(5, intSharedFlow.single())
}
This code results java.lang.IllegalStateException: This job has not completed yet
Jason Ankers
04/21/2021, 4:14 AMStateFlow
value from Dispatchers.Main
, and have some separate code collecting the flow also on Dispatchers.Main
, can I guarantee that the collector receives the value immediately (synchronously) after the state flow emits?kissed
04/21/2021, 9:06 AMjossiwolf
04/30/2021, 6:02 PMsuspend fun Flow<Boolean>.flippedTo(value: Boolean, block: () -> Unit) {
var previous: Boolean? = null
collect { current ->
if (previous == !value && current == value) {
block()
}
previous = current
}
}
boolFlow.flippedTo(false) {
println("Flipped from true to false")
}
boolFlow.flippedTo(true) {
println("Flipped from false to true")
}
Koya Sivaji
04/30/2021, 8:04 PMbaxter
05/07/2021, 4:51 PMcollect {}
without auto-importing it), it'll default to the Flow.collect()
method, which should be expected since the extension function collect {}
was not imported. The problem is that when you do this, there's no option to automatically fix the issue unless you manually add the import statement, or re-type in "collect" but allowing auto-import to do its job.
Now that I have seen this issue, I know how I can help my teammates next time, as we continue working with Coroutines/Flow. However, this was a minor point of frustration... So my question is: Is there any plan to rename or remove the Flow.collect
function so this doesn't happen again?
Screenshot is an example of how unhelpful lint is when you find yourself using Flow.collect()
by accidentJeziel Lago
05/11/2021, 6:12 PMrepeat
with some Flow operator?darkmoon_uk
05/13/2021, 4:03 AMFlow
that emits something n
times.
flow { repeat(n) { emit(theThing) } }
sjj
05/24/2021, 11:30 PMshareIn
operator to ensure the flow is only actually collected once, and subsequently cached. Based on what I've read, due to the fact that collect
on a SharedFlow
never actually returns, the recommended pattern is to use some kind of sentinel value to denote the completion of data and combine that with the takeWhile
operator. Just wanted to check this is indeed the recommended solution.Mikael Alfredsson
06/01/2021, 12:16 PMval result = state.transformLatest {
when (it) {
"a" -> emitAll(emitter("A"))
"c" -> emitAll(emitter("C"))
}
}
result.collect {
println(it)
}
this will print a flow of A+<counter> or “C” +<counter> depending on the last value from “state”.
I would like to ignore “unknown” states, but since transformLatest cancels the previous flow even if i don’t emit anything new, setting the state to “B” will just cancel the output, not continue on the previous selected output. Is there a nice way to achieve what I want?ninad thakare
06/14/2021, 2:20 PMFrancisco
06/15/2021, 4:35 AMConflation
in case one of the collector is being to slow
• flow is generated once (meaning I don't want to compute each item in the flow twice)
I was able to do this using broadcast channel and consuming it as channel like openSubscription().consumeAsFlow().conflate()
. Is is possible to do something similar to this using flows? Thanks!Ciprian Grigor
06/25/2021, 12:41 PMRobert Wijas
07/02/2021, 1:53 PM/**
* Implements function from
* [RxJava](<http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#throttleLatest-long-java.util.concurrent.TimeUnit->)
*/
internal fun <T> Flow<T>.throttleLatest(timeout: Duration) =
conflate().transform {
emit(it);
delay(timeout);
}
Hey, am I missing any flaws in this implementation? Thx 👋Jérémy CROS
07/05/2021, 1:50 PMMutableSharedFlow
exposed. But would you have a SharedFlow
exposed? Since the consumer may need to know that it’s hot? Or a regular Flow
? Since the consumer is just going to ‘.collect()’ anyway? Or is it a case by case discussion?ebukreev
07/06/2021, 7:47 AMChris Fillmore
07/20/2021, 8:59 PMfilter
or map
, but these return regular `Flow`s, but I still want to use a SharedFlow. I know I can call stateIn
or shareIn
to turn them back into a StateFlow/SharedFlow, but this requires that I pass some arguments, when what I really want is for the same SharedFlow I had before, except with these filters/maps applied. Am I missing something? Is there a better way to map/filter a SharedFlow?