Erik
04/21/2021, 4:07 PMflow
and a suspending function foo
that I want to combine: when either produces a value, I want to combine the latest from both into a new emission. combine
does that for flows, but foo
is a suspending function that should be invoked only once. Also, combine
doesn't emit until both sources have emitted at least once, but I want to emit either value as soon as it arrives and only start combining when both sources have produced a value. The type of the flow emissions and the return type of the suspending function are equal. I can think of some ways to do this, but there might be a very idiomatic and logical way, or even standard way of doing this. Any idea?baxter
04/21/2021, 4:15 PMmap
operator to call foo
. Example:
flow.map { item ->
val result = foo()
return@map Bar(result, item)
}
Erik
04/21/2021, 4:16 PMfoo
on every emission, wouldn't it?flow
emits its first valuebaxter
04/21/2021, 4:36 PMfoo
on every emit, but it wouldn't emit `foo`'s value. The return@map
returns whatever value you want.foo
once, then flow all other values, you would need to do something like:
flow {
val result = foo()
emitAll(otherFlow)
}
This would call foo()
exactly once, before emitting all values of your flow. From here, if you want to modify each value of the otherFlow
, you can apply the map
or transform
operators as you need.natario1
04/22/2021, 11:51 AMcombineIncrementally
which does what you want. Feel free to use a different approach for nullOnStart / nullOnError
inline fun <reified T, R> combineIncrementally(
vararg flows: Flow<T>,
crossinline transform: suspend (Array<T>) -> R
): Flow<R> = combine(
flows = flows.map { it.nullOnStart().nullOnError() }
) {
val results = it.filterNotNull()
transform(results.toTypedArray())
}
private fun <T> Flow<T>.nullOnStart() = (this as Flow<T?>).onStart { emit(null) }
private fun <T> Flow<T>.nullOnError() = (this as Flow<T?>).catch { emit(null) }
flow { emit(foo()) }
, then combine the two flows