Erik
04/21/2021, 4:07 PMflow and a suspending function foo that I want to combine: when either produces a value, I want to combine the latest from both into a new emission. combine does that for flows, but foo is a suspending function that should be invoked only once. Also, combine doesn't emit until both sources have emitted at least once, but I want to emit either value as soon as it arrives and only start combining when both sources have produced a value. The type of the flow emissions and the return type of the suspending function are equal. I can think of some ways to do this, but there might be a very idiomatic and logical way, or even standard way of doing this. Any idea?baxter
04/21/2021, 4:15 PMmap operator to call foo. Example:
flow.map { item ->
val result = foo()
return@map Bar(result, item)
}Erik
04/21/2021, 4:16 PMfoo on every emission, wouldn't it?Erik
04/21/2021, 4:17 PMflow emits its first valuebaxter
04/21/2021, 4:36 PMfoo on every emit, but it wouldn't emit `foo`'s value. The return@map returns whatever value you want.baxter
04/21/2021, 4:38 PMfoo once, then flow all other values, you would need to do something like:
flow {
val result = foo()
emitAll(otherFlow)
}
This would call foo() exactly once, before emitting all values of your flow. From here, if you want to modify each value of the otherFlow, you can apply the map or transform operators as you need.natario1
04/22/2021, 11:51 AMcombineIncrementally which does what you want. Feel free to use a different approach for nullOnStart / nullOnError
inline fun <reified T, R> combineIncrementally(
vararg flows: Flow<T>,
crossinline transform: suspend (Array<T>) -> R
): Flow<R> = combine(
flows = flows.map { it.nullOnStart().nullOnError() }
) {
val results = it.filterNotNull()
transform(results.toTypedArray())
}
private fun <T> Flow<T>.nullOnStart() = (this as Flow<T?>).onStart { emit(null) }
private fun <T> Flow<T>.nullOnError() = (this as Flow<T?>).catch { emit(null) }natario1
04/22/2021, 11:53 AMflow { emit(foo()) }, then combine the two flows