Hi there! I'm having issues trying to implement c...
# flow
n
Hi there! I'm having issues trying to implement caching with Flow. Requirements: return a cached value and then follow up with network request result. The cached value must be observed regardless of network call success or failure. Implementation looks like
Copy code
fun data(): Flow = flowOf(
    flowOf(cachedValue),
    networkCallFlow
). flattenConcat()
which is then observed in a ViewModel and mapped to UI state
Copy code
data()
.onEach { .. }
.map { uiState(data) }
.catch { emit(uiState(error)) }
It works fine when the network call succeeds. The problem is, when
networkCallFlow
throws an exception (no network), the
cachedValue
is never mapped, however it triggers the
onEach
handler. My guess is that the emission of the
cachedValue
does not guarantee that it will be observed, and the exception halts the Flow before the consumption of the
cachedValue
happens or while it is in progress. Is my understanding correct? Is there any way to adjust the Flow to fit the requirements? The code: https://pl.kotl.in/3U8jVprL-
e
can you provide a better example? because I don't see what you're running into.
Copy code
flowOf(
    flowOf(1),
    flow { TODO() }
)
    .flattenConcat()
    .map { "value=$it" }
    .catch { emit("error=$it") }
    .collect { println("collect($it)") }
this prints out
Copy code
collect(value=1)
collect(error=kotlin.NotImplementedError: An operation is not implemented.)
f
just want to add that there is a simpler way to do this, with
onStart
n
Thanks ephemient, I'm trying to boil it down to a reproducible piece. It has something to do with concurrency because it works when I put a delay between data and error emissions.
https://pl.kotl.in/3U8jVprL- @ephemient here is a better example, I also replaced flattenConcat with a simpler version to not use the experimental API. Sometimes it observes both values, sometimes only the error, intermittent.
e
creating a
CoroutineScope
like that is not good, but that's unrelated made a much smaller reproducible case which makes the behavior more obvious:
Copy code
suspend fun main() {
    do {
        val list = flow { emit(1); TODO() }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>).catch { emit(0) }.toList()
        println(list)
    } while (list == listOf(1, 0))
}
what is happening is that due to it running a separate thread, the flow can throw (and cancel itself) before the previous value is observed
this doesn't happen without
.flowOn
because with the flow and collector running in lockstep, the flow suspends when emitting
n
Thank you, I was thinking in the same direction. If the network call runs on IO, there is always a chance that the collection of the cached value is not finished at the time when the Flow is cancelled. Is there any way to make it work? I'm thinking the only way is probably to catch and wrap exception on
networkCallFlow
so that the resulting observed Flow never throws anything.
e
you just have to make sure you run the map+catch without buffering
n
Oh yeah, now I see! The Flow that throws exceptions still can run on the IO, I just need to make sure that the umbrella Flow and the collector are on the same thread.
e
exactly :)
n
Thanks heaps @ephemient! 🙏