Good day. I'm wondering if is it expected behavior...
# coroutines
e
Good day. I'm wondering if is it expected behavior that a combination of
shareIn
,
SharingStarted.WhileSubscribed(0, 0)
and
.first()
may lead to the lock? For example, I have a class which supposes to run one updater at a time. Two clients request the same resource at once and I want to be sure that only one request will be executed. But sometimes when the second client tries to start the updater but updater already invokes emit for the first client and the flow isn't finished yet, the second client will wait first() infinitely. Here is an example with tests.
s
Without a replay cache, it’s pretty likely that only the first subscriber will ever see the value. Shared flows don’t terminate, so any subscribers that arrive after the first item was emitted will just wait forever. So yes, this is expected behaviour. I think you will see the behaviour you want if you set the replay cache size to 1. edit: I changed my mind
Have you considered other approaches besides a flow? Since you only have a single value,
async
with
CoroutineStart.LAZY
seems like a simpler way to get the same thing.
e
shareIn
approach allows also to cancel coroutine if there are no consumers.
s
I’m doubting my original answer now. 🤔
Since the
SharingStarted
strategy is supposed to stop the sharing as soon as the subscribers disappear, there should be only two possible outcomes: 1. Subscriber arrives while
updater.invoke()
is in progress, in which case it will see the result 2. Subscriber arrives after
updater.invoke()
is completed, in which case the sharing was already terminated and should be started again. It does seem like what you’re describing is an unexpected behaviour that doesn’t fit either of those two assumptions.
e
Here is a snapshot from my example. There is one more case when subscriber arrives after
updater.invoke()
but before flow was finished.
Copy code
private val value = flow<Result> {
    emit(updater.invoke())
    // subscriber arrives here before flow was finished
}.shareIn(
    scope = updateScope,
    started = SharingStarted.WhileSubscribed(
        0,
        0,
    ),
)
In this case
shareIn
logic assumes that flow is in progress, doesn't trigger one more, but there will no be any new events from flow.
I have a workaround based on the same logic as
shareIn
in a separate branch, but it looks too complex, but it works ^^
s
Does adding a replay cache of size 1 work? The replay cache will still be discarded when the sharing stops. So it should just ensure that the value remains available if there’s a delay after it’s emitted and before the sharing actually stops.
e
Hmm. I missed part that “The replay cache will still be discarded when the sharing stops”. In my example tests it works. Give me some time to check on more complex code.
s
The bit about the replay cache being discarded is due to the parameters passed into the sharing strategy
One of them is a timeout that indicates how long to keep the replay cache after sharing stops
e
Got it. Thanks for the advice.