jeff
11/19/2019, 6:34 PMflow {
emit(expensiveNetworkOperation1())
emit(expensiveNetworkOperation2())
emit(expensiveNetworkOperation3())
}
and I want to collect that flow repeatedly, without repeating the expensive operations.
Here's my solution. Feedback? Gotchas I'm missing?
/**
* Collects the upstream source a single time, and sends cached values to any future collectors
*/
fun <T> Flow<T>.replay(): Flow<T> {
val upstream = this
return object : AbstractFlow<T>() {
val isFinished = AtomicBoolean()
val isCollecting = AtomicBoolean()
val values = mutableListOf<T>()
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
// Collect the upstream flow (but only once) and remember the values it emits
if (!isCollecting.getAndSet(true)) {
launch {
upstream.collect { values += it }
isFinished.set(true)
}
}
// Create a downstream flow that just emits cached values
val downstreamFlow = flow {
var i = 0
while (!isFinished.get()) {
while (i < values.size) {
emit(values[i])
i++
}
delay(1)
}
}
collector.emitAll(downstreamFlow)
}
}
}
}
Andy Gibel
11/19/2019, 6:36 PMjeff
11/19/2019, 6:39 PMAndy Gibel
11/19/2019, 6:40 PMLuis Munoz
11/19/2019, 6:48 PMjeff
11/19/2019, 6:50 PM