https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tim Malseed

03/01/2023, 1:21 AM
I have a
List<Flow<String?>>
I’d like to emit the latest value from the first flow in the list - unless that value is null, in which case I’d like to move to the next flow. Does anyone know how to achieve this?
e

ephemient

03/01/2023, 1:42 AM
not quite sure what you mean. flows on their own don't have a "latest value" unless they're state flows
t

Tim Malseed

03/01/2023, 1:43 AM
Sorry, I meant the latest emission
e

ephemient

03/01/2023, 1:43 AM
that only exists while you are collecting. does that mean you want to collect all of them at once, or something else?
you can easily create a single flow of all the non-null elements emitted by any of them, in parallel:
Copy code
merge(flows.map { it.filterNotNull() })
this of course does not reflect the ordering of the original list
t

Tim Malseed

03/01/2023, 2:12 AM
I don’t know exactly what I want 😳 Maybe what I’m suggesting doesn’t make sense. I basically want the behaviour you’ve demonstrated with merge, but retaining the original order. I don’t think I want to ‘collect’, because I want to continue receiving the latest values if the flows continue to emit.
c

Chris Fillmore

03/01/2023, 3:14 AM
I want to continue receiving the latest values if the flows continue to emit.
This is what
collect
does. I think you’re right, you’re not sure what you want. Let’s take a step back. What data is in your flows and what’s your use case? What are you trying to do?
t

Tim Malseed

03/01/2023, 3:17 AM
This seems to do the trick:
Copy code
val flows: List<Flow<String?>>

fun getValue(): Flow<String?> {
    return combine(dataSources.map { it.getValue() }) { values ->
        values.first { it != null }
    }
}
Perhaps I didn’t explain it very well..
I have a list of ‘data sources’, which each contain a function returning a flow, which can emit a string (or null). I want to return the latest non-null emission, from the first datastore in the list whose latest emission is non null
e

ephemient

03/01/2023, 3:21 AM
in other words, if an earlier datasource emits null, you want to switch to a later datasource?
c

Chris Fillmore

03/01/2023, 3:22 AM
From that description it sounds like the order of flows in the list is not meaningful, so it would make sense to merge them and filter null values
e

ephemient

03/01/2023, 3:22 AM
if these are functions (that only return once) the problem is much simpler
t

Tim Malseed

03/01/2023, 3:22 AM
If an earlier datasource’s latest emission is null, switch to a later datasource
c

Chris Fillmore

03/01/2023, 3:23 AM
Ahh I see
t

Tim Malseed

03/01/2023, 3:23 AM
I think I’ve solved it with the combine function. I spent a few hours on it, but it turns out the testing library I’m using on Android doesn’t work correctly with combine, and sometimes conflates emissions. So I was getting false negatives 😭
So I probably solved it a few other times in the 70 iterations I tried
c

Chris Fillmore

03/01/2023, 3:25 AM
Check out Turbine if it helps https://github.com/cashapp/turbine I’ve never used it but you can always ask about it in #squarelibraries
t

Tim Malseed

03/01/2023, 3:26 AM
Yeah, turbine is what I’m using
There’s an issue with Turbine and Combine.
It seems to be a coroutines problem: https://github.com/Kotlin/kotlinx.coroutines/issues/2082#issuecomment-640621404 Probably my fault - I don’t think I’m using the correct dispatcher (test dispatcher). But I haven’t confirmed yet
s

streetsofboston

03/01/2023, 4:09 AM
What about
Copy code
flowOfFlows = 
  flow {
    listOfFlows.forEach {
      emit(it.takeWhile { it != null })
}
}.flattenConcat()
I haven't tried this at all, but if I understand your problem, this could be a solution. First change your list of flows into a flow of flows. Make sure that each flow ends/completes when null is emitted (return new flows that complete this way using takeWhile). And now that all inner flows complete, you can call flattenConcat to concatenate them all.
e

ephemient

03/01/2023, 6:48 AM
that does something different. if you have
Copy code
listOf(
    flow {
        emit(null)
        delay(100)
        emit("a")
    },
    flow {
        emit("b")
    }
)
then the
combine
method will produce "b", "a" while yours will produce "a", "b"
s

streetsofboston

03/01/2023, 11:50 AM
Ah... Yup, flattenConcat doesn't interleave.
e

ephemient

03/01/2023, 11:27 PM
if you did want to limit concurrency, you could build up a chained flow like
Copy code
fun <T : Any> List<Flow<T?>>.switchToFirstNonNull(): Flow<T> = foldRight(emptyFlow()) { flow, fallback ->
    flow.flatMapLatest { if (it != null) flowOf(it) else fallback }
}
that results in subscriptions to the later flows only when the earlier flows emit
null
. of course, this can result in repeated re-subscriptions to the later flows
12 Views