(Update from earlier): I want to cache/share a Flo...
# coroutines
j
(Update from earlier): I want to cache/share a Flow (like rx's replay)? E.g. I have
Copy code
flow { 
    emit(expensiveNetworkOperation1())
    emit(expensiveNetworkOperation2())
    emit(expensiveNetworkOperation3())
}
and I want to collect that flow repeatedly, without repeating the expensive operations. Here's my solution. Feedback? Gotchas I'm missing?
Copy code
/**
 * Collects the upstream source a single time, and sends cached values to any future collectors
 */
fun <T> Flow<T>.replay(): Flow<T> {

    val upstream = this

    return object : AbstractFlow<T>() {

        val isFinished = AtomicBoolean()
        val isCollecting = AtomicBoolean()

        val values = mutableListOf<T>()

        override suspend fun collectSafely(collector: FlowCollector<T>) {
            coroutineScope {

                // Collect the upstream flow (but only once) and remember the values it emits
                if (!isCollecting.getAndSet(true)) {
                    launch {
                        upstream.collect { values += it }
                        isFinished.set(true)
                    }
                }

                // Create a downstream flow that just emits cached values
                val downstreamFlow = flow {
                    var i = 0
                    while (!isFinished.get()) {
                        while (i < values.size) {
                            emit(values[i])
                            i++
                        }
                        delay(1)
                    }
                }

                collector.emitAll(downstreamFlow)
            }
        }
    }
}
a
Wasn't there recently some JB work on doing a BehaviorSubject type thing?
j
Maybe? If so I'd love a link. Only aware of: https://github.com/Kotlin/kotlinx.coroutines/issues/1261 and https://github.com/Kotlin/kotlinx.coroutines/issues/1086 (both of which are still open)
a
ah yes, the share() was what I was thinking of
l
no error handling?
j
yep, should add that 🙂