I want to use `Flow.combine(Flow)` However, the se...
# coroutines
m
I want to use
Flow.combine(Flow)
However, the second flow is not known until at some point in the progression of the original Flow (at which point, the second flow can be derived). At the moment, I achieve this by (and it feels like a very nasty hack) having a
MutableStateFlow
and updating that flow from a coroutine launched from within
onEach
(just before
combine
). So then we use
combine(MutableStateFlow)
. But how to do this properly?
l
onStart { emit(initialValue) }
?
m
I don’t think so. I just want a
combine
to take a Flow that is derived from the current value of the main Flow
l
Then you want
initialValue
to be
null
or something that you can distinguish from the
combine
lambda.
m
So what does the upstream Flow look like and when and how do you get another Flow to combine the previous one? Difficult to help without more context.
m
Use case: the main Flow is a list of search result items. Progressively items are added to the list. So [] -> [A] -> [A, B] -> [A,B,C] …. but there might be an item with an associated flow (that search result item has dynamic content)
So I combine the main flow with that item flow to produce an immutable state of the search results
m
So A may be an item, B may be a Flow, C may be an item etc?
m
The main flow produces immutable items (every item has some kind of static/immutable content). But it might be that we decide to decorate an item with some dynamic content.
m
So where is the dynamic content coming from? Is it one completely separate Flow? Or can there be separate dynamic Flows for different items?
e
off hand it sounds like
.flatMapLatest()
from your description but it's hard to tell
👍 1
m
Do you have one
Flow<Item>
and one
Deferred<Flow<Item>>
(kinda)? I still can’t wrap my head around it 😄
m
Flow<List<Item>>
and then for a certain
Item
subclass, I can create a
Flow<DynamicStuffForSpecialItem>
and then I can
Flow<List<Item>>.combine(Flow<DynamicStuffForSpecialItem>>)
to make
Flow<List<Item>>
The returned
Flow
has a dynamic item in place of the special static item in the original flow
m
And why is the dynamic Flow not available immediately?
Is the Flow instance not available or is the Flow empty for a while?
m
It just doesn’t exist. It’s based on the ID of the search result item
m
So you only know what while the
Flow<List<Item>>
is already being used, right?
m
right
l
Then materialize the existence of that to a flow, and when it starts to exist, emit it to that flow, so you can use it for
combine
, and also use
onStart
if needed?
👍 2
m
Why don’t you provide a
Flow<DynamicStuffForSpecialItem?>
instead that either emits
null
(no dynamic stuff available) or
DynamicStuffForSpecialItem
.
m
That’s kind of what I do, but it feels like a hack. I set up a
MutableStateFlow
of
Flow<DynamicStuffForSpecialItem?>
and then pass the
flatMapLatest { it ?: flowOf(nothing) }
to
combine
(thanks for the tip @ephemient). The
MutableStateFlow
is set in
onEach
when the dynamic flow becomes available. Is this the best way?
m
I have no idea how and why you use a
StateFlow
,
flatMapLatest
and
onEach
😅
m
Out of interest, is there a technical term we use for a
Flow
whose purpose is to act as a kind of container to receive a delayed
Flow
(and then to be used with
flatMapLatest
)?
l
Nope, we just call it a flow. All of this is implementation details.
m
Hmm, looks more like a design pattern to me.
f
A scan seems more appropriate than a combine in that case
e
scan wouldn't handle fetching multiple decorations from db in parallel. not sure if that's a requirement, but it doesn't strike me as more appropriate than combine
I think these are the building blocks that would work
m
@ephemient thanks! Please can you explain
stateInWithTermination
? From what I can see, it’s just duplicating a Flow and re-emitting the last value? What am I missing?
e
`StateFlow`/`.stateIn()` never completes, even if the underlying flow does, so in this example,
.collect()
would continue forever if I used
.stateIn()
directly.
.stateInWithTermination()
returns a flow which behaves like
.stateIn()
until the underlying flow completes, after which it behaves like
flowOf(final value)
.
👍 1
m
Thanks, that makes sense. Regarding the mutex, is that necessary? Just trying to get my head around how that block could be executed at the same time by two different coroutines
e
the same flow may have multiple collectors at once,
Copy code
val flow = ...
launch { flow.collect { println("1: $it") } }
launch { flow.collect { println("2: $it") } }
each such subscription is independent, but the cache isn't
👍 1
m
Right, I see. The combined flow.
Out of curiosity, if it was exposed as a SharedFlow, then the mutex wouldn’t be needed?
e
right