Marc Knaup
10/13/2020, 11:20 PMstateIn
plus distinctUntilChanged
to avoid expensive re-computations after a Flow was temporarily cold. Unfortunately that doesn’t work because distinctUntilChanged
doesn’t maintain state when the Flow turns cold.
Does anybody have an idea how I can implement the following scenario with existing operators?
Easy part:
1. My shared flow turns hot because of first subscriber
2. Upstream emits its latest state and periodically emits new states
3. Ignore value if same as previous value
4. Perform expensive transformation (recompute a graph index when vertices and/or edges change)
5. Share expensive value with current and future subscribers (i.e. replay = 1)
Difficult part:
6. My shared flow turns cold because all subscribers are gone
7. My shared flow turns hot again because of first subscriber
8. Upstream emits its latest state and periodically emits new states
9. Ignore value if same as previous value ⬅️ doesn’t work for initial value b/c Flow was cold and operator was reset
10. Perform expensive transformation ⬅️ unnecessarily applied again for a value that was already transformed and cached in replay buffer
11. Share expensive value with current and future subscribers (i.e. replay = 1)
I kinda need either
• a distinctUntilChanged
that survives a Flow turning cold or
• a stateIn(…, replay = false)
that ignores the upstream value because it’s the same as its current state but that doesn’t replay it.Marc Knaup
10/14/2020, 3:31 PM(Mutable)CacheFlow
that extends (Mutable)StateFlow
.
The main difference is that it can be recursive (normal MutableStateFlow) or non-recursive. When it’s non-recursive then providing a new value to the MutableCacheFlow
will not emit it to the same flow, thereby avoiding recursive scenarios.
Collectors of the recursive cache behind the non-recursive cache will still receive newly emitted value. That means if we have a cache
and two non-recursive `MutableCacheFlow`s flow1
and flow2
then updates to flow1
would only be collected by flow2
and updates to flow2
would only be collected by flow1
.
This is especially useful when values coming out of the cache are transformed downstream and then committed to the cache again. With a regular MutableStateFlow
that would cause an endless recursion.
https://gist.github.com/fluidsonic/a06419b1c129f3045558c63cbafecca4
Thoughts?ansman
10/28/2020, 6:28 PMshareIn
in that terminal events are not propagated to subscribers?bod
10/31/2020, 4:05 PMMutableStateFlow
? For instance, does something similar to callbackFlow {}
exist?Mikael Alfredsson
11/03/2020, 4:55 PMMarek Kubiczek
11/11/2020, 8:26 AMrunningReduce
but it’s not 100% what I need as I always want two last values from upstream rather then the current Value and accumalated value.Smorg
11/12/2020, 5:22 AMSudhir Singh Khanger
11/13/2020, 9:14 AMLiveData
.eburke
11/17/2020, 3:45 PMFlow.map
won’t fire properly.
myFlow
.map { input ->
someFunction(input) // breakpoint here doesn't fire
}
However, if I extract that method, it fires:
val transform: suspend (input: ...) -> ... = {
someFunction(input) // breakpoint here fires!
}
myFlow
.map(transform)
has anyone seen this?Smorg
11/17/2020, 7:34 PMflow2
based on the condition of an upstream flow flow1
? this is what I have tried but I am not sure if I actually should be doing the tagged line
flow1()
.flatMapLatest { res ->
if(res.conditionIsFalse) {
flowOf() // works but wondering if there is an operator that doesn't require this
} else {
flow2()
}
}
.onEach { flow2Res ->
// do something with flow2Res
}
.launchIn(scope)
Mikael Alfredsson
11/18/2020, 8:47 PMshareIn
. in an Android Studio project. The editor code-completes it, I can jump to the source of it, but when I compile I get a Unresolved reference: shareIn
it sounds like the compiler and the editor isn’t in sync, but I don’t know how to solve it.Mikael Alfredsson
11/24/2020, 6:23 AMclass FlowGroup<T>(
items:List<MutableSharedFlow<T>>,
transform: suspend (Array<T>) -> T
) {
val flow = MutableSharedFlow<T>()
init {
MainScope().launch {
combine(items, transform).collect {
flow.emit(it)
}
}
}
}
combine
wont compile in this code because it uses the reified
qualifier, and I don’t know how to get that into this function. Any suggestions?mng
11/25/2020, 6:44 PMsuspend functions
vs Flows
. Assuming I follow a Clean architecture approach for my application, when should i be using suspend functions
and when should I use Flows
?
Assuming I am only ever handling cold streams of data, when should I use one over the other?
Should the presentation layer only be exposed to Flows
and suspend functions
be used by data and domain layers? Should the suspend functions
just be used for “one-off” type functions that need to be done on a separate thread?Bacho Kurtanidze
12/04/2020, 12:20 PMflowOf(1, 2, 3)
how would I do some suspended operations on them, write results for each in detabase and then only emit once. rn collect{}
is called every time new item is emited, which is giving me undesirable results. I tried to use onCompletion
but it only is called when ViewModel dies.Saul Wiggin
12/09/2020, 11:06 AMBacho Kurtanidze
12/14/2020, 11:20 AMdao.get(areaAccountingId = areaAccountingId, assetId = assetId)
.filterNotNull()
.map {
it.toDomainModel()
}.onEmpty { }
I want onEmpty to get triggered but it doesn'tDALDEI
12/17/2020, 1:22 AMCLOVIS
12/17/2020, 10:01 PMMarc Knaup
12/25/2020, 4:07 PM.conflate().map { … }
and .conflate().mapLatest { … }
?Lauren Yew
01/07/2021, 7:50 PMMutableStateFlow
without an initial state? I.e. I want a flow that I can make into a variable and reference and send data but I don't want to specify an initial state.Marc Knaup
01/11/2021, 11:49 AMemit()
invocation of a MutableSharedFlow
until all subscribers have completed collecting the element? There’s no replay.Javier
01/12/2021, 9:59 PMEither
, but it seems like the first emission is being mixed if there are more than one emission, so store and store2 are different instances with should have the same emissions:
store.stream().take(1).toList().also { println(it) }
store2.stream().take(2).toList().also { println(it) }
That snippet prints:
[Right(right=Success(data=[1, 2, 3, 4], isLoading=true))]
[Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=true)), Right(right=Success(data=[1, 2, 3, 4, 7, 8], isLoading=false))]
Mattias Flodin
01/13/2021, 3:53 PMsuspend fun <T> collectFromHere(flow: SharedFlow<T>): Flow<T> {
val channel = Channel<T>()
flow.collect { channel.send(it) }
return object: Flow<T> {
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
for (v in channel) {
collector.emit(v)
}
}
}
}
breandan
01/26/2021, 6:52 AMany
and all
? I need a way to short circuit a Flow if a value is ever found matching a predicate, similar to `anyMatch`/`allMatch`/`noneMatch` in the Java Streams APIChristian S.
01/26/2021, 2:38 PMdef results = []
BuildersKt.runBlocking(EmptyCoroutineContext.INSTANCE) {
sut.createFlow(someArg).collect { results << it }
}
I have found this SO post but haven't discovered a simpler solution, yet. Can someone please help me out? Thanks!Logan Knight
01/29/2021, 6:40 PMDavid Pisoni
01/29/2021, 8:33 PMSharedFlow
or StateFlow
initialized from a cold Flow
(with stateIn(...Eagerly...)
) to a var
, but then later assign null
to the `var`; once there are no subscribers left to the SharedFlow
(and no references left to it) will it cancel the original cold Flow
(and otherwise free its resources)?Daniel
02/12/2021, 12:49 PMmutableStateFlow.value += 1
? Is it
var cached = mutableStateFlow.value
while (!mutableStateFlow.compareAndSet(cached, cached + 1)) {
cached = mutableStateFlow.value
}
Ed George
02/13/2021, 11:09 AMSharedFlow
incorrectly, but would be good to understand why.
My scenario is that I have an Android app with a list. When I click an item in the list, I call a method in my viewModel that emits a sealed class via a SharedFlow. The Android Fragment is collecting this flow and handling the UI based on the sealed class collected from it.
However, clicking the same list item multiple times is not changing the UI multiple times - it seems like it happens occasionally and then emits multiple items should I change the item I click on (i.e. return a different instance of the sealed class)
My question is, is there a way I can emit items every-time regardless of whether the previous item was the same item to be emitted?
I am sure this is a fairly common question, but maybe I have not quite worked out how to google it successfullyMarc Knaup
02/17/2021, 7:22 PMMutableStateFlow
so that I can do something with new values before they are sent downstream?
E.g.
fun <Value: Any> persistedFlow(key: String, serializer: KSerializer<Value>): MutableStateFlow<Value?> =
save(MutableStateFlow(value = load(key, serializer)), key, serializer)
How would I implement save
?
According to the docs:
Theinterface is not stable for inheritance in 3rd party libraries (…)MutableStateFlow
Marc Knaup
02/17/2021, 7:22 PMMutableStateFlow
so that I can do something with new values before they are sent downstream?
E.g.
fun <Value: Any> persistedFlow(key: String, serializer: KSerializer<Value>): MutableStateFlow<Value?> =
save(MutableStateFlow(value = load(key, serializer)), key, serializer)
How would I implement save
?
According to the docs:
Theinterface is not stable for inheritance in 3rd party libraries (…)MutableStateFlow
gildor
03/02/2021, 12:08 PM