I have a flow `flow` and a suspending function `fo...
# coroutines
e
I have a flow
flow
and a suspending function
foo
that I want to combine: when either produces a value, I want to combine the latest from both into a new emission.
combine
does that for flows, but
foo
is a suspending function that should be invoked only once. Also,
combine
doesn't emit until both sources have emitted at least once, but I want to emit either value as soon as it arrives and only start combining when both sources have produced a value. The type of the flow emissions and the return type of the suspending function are equal. I can think of some ways to do this, but there might be a very idiomatic and logical way, or even standard way of doing this. Any idea?
b
You would use the
map
operator to call
foo
. Example:
Copy code
flow.map { item ->
  val result = foo()
  return@map Bar(result, item)
}
e
That would call
foo
on every emission, wouldn't it?
And it wouldn't emit `foo`'s value when it returns before
flow
emits its first value
b
Yes, it would call
foo
on every emit, but it wouldn't emit `foo`'s value. The
return@map
returns whatever value you want.
If you only want to call
foo
once, then flow all other values, you would need to do something like:
Copy code
flow {
  val result = foo()
  emitAll(otherFlow)
}
This would call
foo()
exactly once, before emitting all values of your flow. From here, if you want to modify each value of the
otherFlow
, you can apply the
map
or
transform
operators as you need.
n
I'm using this
combineIncrementally
which does what you want. Feel free to use a different approach for nullOnStart / nullOnError
Copy code
inline fun <reified T, R> combineIncrementally(
    vararg flows: Flow<T>,
    crossinline transform: suspend (Array<T>) -> R
): Flow<R> = combine(
    flows = flows.map { it.nullOnStart().nullOnError() }
) {
    val results = it.filterNotNull()
    transform(results.toTypedArray())
}

private fun <T> Flow<T>.nullOnStart() = (this as Flow<T?>).onStart { emit(null) }

private fun <T> Flow<T>.nullOnError() = (this as Flow<T?>).catch { emit(null) }
Before this you'll have to make a flow out of foo with
flow { emit(foo()) }
, then combine the two flows