https://kotlinlang.org logo
#coroutines
Title
# coroutines
p

Paul Woitaschek

07/17/2019, 1:58 PM
How can I get the latest emission upon an error and continue - modify that? I have sth like a
Flow<ViewState>
. This
ViewState
also has a boolean property if an error should be shown. Upon error I want to emit the latest view state but call
copy(showError = true)
and complete the flow so that a consumer can use sth like:
.repeatWhen { repeatClickedFlow() }
s

streetsofboston

07/17/2019, 2:00 PM
How does your `Flow<ViewState>`’s code look like without the error handling? (ie the code that creates the Flow<ViewState> instance)
p

Paul Woitaschek

07/17/2019, 2:02 PM
s

streetsofboston

07/17/2019, 2:06 PM
Thanks. Can you restart/re-get the
contentFlow
when the first one produces an error/exception?
p

Paul Woitaschek

07/17/2019, 2:07 PM
Yes
s

streetsofboston

07/17/2019, 2:09 PM
Then wrap the
emitAll
inside a try-catch clause. In the catch, get the contentFlow again (i would change the type of
contentFlow
from
Flow<...>
into
() -> Flow<...>
, and use
contentFlow()
to actual obtain the flow.
p

Paul Woitaschek

07/17/2019, 2:10 PM
But how would I be able to get the latest state and flip the error flag?
s

streetsofboston

07/17/2019, 2:12 PM
And for this snippet, add a state-full variable.
Copy code
...
    return flow {
      var latestState = baseState
      emit(latestState)
      while (true) {
          val sourceFlow = contentFlow()
          try {
              sourceFlow.collect {
                  latestState = it
                  emit(latesState)
              }
          } catch (t: Throwable) {
              latestState = latestState.copy(....)
              emit(latestState)
          }
      }
    }
Something like this….
With using
contentFlow()
, I assume you would need to create/get a brand-new one. If not, if just starting to
collect
it again is enough, just use
contentFlow.collect
instead of the `sourceFlow.connect`…
Another way, but that is not yet possible, is to
contentFlow.share(1)
and use this shared-flow to collect and use it in the
onErrorResumeNext(...)
as well. But
Flow<T>.share(n)
does not yet exist…..
p

Paul Woitaschek

07/17/2019, 2:33 PM
I tried it in isolation. Does it throw a backend error for you too? @streetsofboston https://gist.github.com/PaulWoitaschek/f70dd8a8dbb2807bc1da84e8d2dd270c
s

streetsofboston

07/17/2019, 2:34 PM
What is a backend error?
s

streetsofboston

07/17/2019, 2:37 PM
I don’t think I have all your code…
p

Paul Woitaschek

07/17/2019, 2:37 PM
What's missing?
s

streetsofboston

07/17/2019, 2:38 PM
E.g. I have no ‘main()’ function…
p

Paul Woitaschek

07/17/2019, 2:38 PM
Just compile it
s

streetsofboston

07/17/2019, 2:40 PM
Ah.. sorry…. let met try 🙂
yup, at my end as well, i get the compiler crash
p

Paul Woitaschek

07/17/2019, 2:43 PM
It's because of the recursion of the inner function calling itself
s

streetsofboston

07/17/2019, 2:43 PM
Yup, removing that removes the crash
With refactoring, the crash goes away:
Copy code
...
fun viewState(repeat: Flow<Unit>): Flow<ViewState> {
    val initialState = ViewState(content = null, loading = true, showError = false)
    return flow {
        emit(initialState)
        var lastState = initialState

        suspend fun updateAndEmit(update: ViewState.() -> ViewState) {
            lastState = update(lastState)
            emit(lastState)
        }

        repeat.startContentFlow(::updateAndEmit)
    }
}

suspend fun <T> Flow<T>.startContentFlow(updateAndEmit: suspend (ViewState.() -> ViewState) ->Unit) {
    try {
        contentFlow().collect {
            updateAndEmit { copy(content = it, loading = false) }
        }
    } catch (t: IOException) {
        updateAndEmit { copy(loading = false, showError = true) }
        take(1).collect {
            updateAndEmit { copy(loading = true, showError = false) }
            startContentFlow(updateAndEmit)
        }
    }
}
...
But this is definitely a compiler bug. You may want to report this one to JetBrains.
p

Paul Woitaschek

07/17/2019, 2:50 PM
Ah nice, thanks! https://gist.github.com/PaulWoitaschek/7e6318614307556fae6770556f83c570 It actually works. However the code isn't the simplest, can you make it simpler?
s

streetsofboston

07/17/2019, 2:51 PM
Not so much… they haven’t yet implemented the
share
or
cache
methods on Flows… that is coming though
you can make it more iterative instead of recursive. The call to ‘startContentFlow’ is not tail-recursive… it can be prone to stack-overflow issues.
p

Paul Woitaschek

07/17/2019, 2:53 PM
How would I do that?
s

streetsofboston

07/17/2019, 2:58 PM
I think that
take(1)
is a Flow that terminates after emitting 1 item. Try to just move the
startContentFlow(updateAndEmit)
call outside of that
collect { .. }
block
If that works, you can remove that
startContentFlow(updateAndEmit)
call and just wrap the body of
startContentFlow
inside a
while(true) { ... }
loop
p

Paul Woitaschek

07/17/2019, 3:22 PM