Are there any articles available on Shared Flow an...
# coroutines
p
Are there any articles available on Shared Flow and error handling? I have a common use case that I want to share a network call but want new consumers no trigger the upstream flow when subscribed after an error.
n
Can you provide more details about the use case?
Generally speaking I find it good practice not to send errors/exceptions down a stream and leave handling the errors to the “trigger” (for ex with a simple get/fetch suspend function) Not sure if it’s applicable to your usecase though
w
The docs on SharedFlow are pretty good, and the tl dr is that when you have a shared flow, you can only deal with regular emissions and send them to subscribers. Uncaught exceptions are afair passed to the exception handler directly
If you want to pass exceptions to consumers, you need to catch them and materialize into an emission
👍 1
p
Right now I have a generic database implementation: Basically it works as follows: • flow on the database a) value doesn't exist yet -> make network call, insert it in the database, database emits agin b) value exists -> emit it Right now a rx replay(1).refcount() is applied so multiple subscribers all get the same result and no unnecessary network calls are made.
Copy code
private fun <T> Flow<T>.replayOneRefCount(): Flow<T> {
  return map { Optional.ofNullable(it) }
    .asFlowable()
    .replay(1)
    .refCount()
    .asFlow()
    .map { it.orElse(null) }
}
I'm not sure how I can replace this with the SharedFlow as it's error handling is different
w
I’m not sure how this replay is related to error handling 🤔 Do you want to also let the subscribers know if the network call failed?
p
Yes
Well the rx error propagates to the consumers
n
.catch{ emit(null) }
or you could replace null, with your custom error signal
☝️ 1
w
With SharedFlow, you can’t return
Flow<T>
anymore, if you want an information about the error as well. You have to wrap it in something so that you can emit
Value(T)
or
Error(Throwable)
And in the place where in RxJava you’d just let the error pass through, you need to do what Nagy mentioned — catch and wrap it in a regular emission, not an exception signal
n
Also I’m curious how Rx’s
refCount
functionality could be translated to coroutines. I’m not sure if you can “stop” a hot observable if there are no subscribers 🤔
Looks like SharingStarted.WhileSubscribed can help you with that
☝️ 1
w
That’s what it’s for, the
share()
operator is emulated by
Copy code
.shareIn(
    scope = scope,
    started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0),
    replay = 1
)
👍 1
Or I’m wrong,
share()
is
publish().refCount()
so the
replay
parameter is
0
. For
replay(1).refcount()
, which is an equivalent of shared BehaviorSubject it would be
replay = 1
p
Thousand thanks @wasyl 💓
I tried that already but the key is the
replayExpirationMillis
So the equivalent of:
Copy code
private fun <T> Flow<T>.replayOneRefCount(): Flow<T> {
  return map { Optional.ofNullable(it) }
    .asFlowable()
    .replay(1)
    .refCount()
    .asFlow()
    .map { it.orElse(null) }
}
Is:
Copy code
private fun <T> Flow<T>.replayOneRefCount(scope: CoroutineScope): Flow<T> {
  return map { Result.success(it) }
    .catch {
      emit(Result.failure(it))
    }
    .shareIn(scope, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), replay = 1)
    .map {
      it.getOrThrow()
    }
}
w
Ye! Though if I understand correctly, the
map { it.getOrThrow() }
should be somewhere in downstream handling code 🤔 Otherwise by default you’re cancelling the
scope
for every flow in it
p
I don't think so
The map is technically already downstream
w
Sorry, you’re right of course! And thanks, because you solved my recent problem exactly 😄