I have the following code to expose SharedPreferen...
# coroutines
u
I have the following code to expose SharedPreferences changes as kotlin flow. As I want to use the flow to initialize UI it should start by emitting the current value of the SharedPreference immediately.
Copy code
fun <V : Any> SharedPreferences.asFlow(
    key: String,
    defaultValue: V?,
    retrieve: SharedPreferences.(String, V?) -> V?,
): Flow<Optional<V>> {
    return callbackFlow {
        val prefsListener = SharedPreferences
            .OnSharedPreferenceChangeListener { sharedPreferences, k ->
                if (k == key) {
                    val value = sharedPreferences.retrieve(key, defaultValue)
                    trySendBlocking(value.asOptional())
                }
            }
        registerOnSharedPreferenceChangeListener(prefsListener)
        awaitClose {
            unregisterOnSharedPreferenceChangeListener(prefsListener)
        }
    }
        .onStart {
            emit(Optional.ofNullable(retrieve(key, defaultValue)))
        }
        .distinctUntilChanged()
As it looks, this code is racy, if shared preferences are changed after
onStart
emits, but collecting of the callbackFlow has not yet started. Anyone has any hints on an elegant solution to this? Like a way to start collecting the callback flow immediately, but still injecting a first element?
t
Instead of doing the emit in onStart you can call the emit of the value before
awaitClose
well,
send
technically
u
Wouldn't that open a window for the initial value to be emitted after the first emission of a real change?
t
I highly highly doubt it. If you are concerned this is the safest implementation I could think of at the moment. I think it is prob overkill and there might be a better solution
Copy code
fun <V : Any> SharedPreferences.asFlow(
    key: String,
    defaultValue: V?,
    retrieve: SharedPreferences.(String, V?) -> V?,
): Flow<V?> {
    return callbackFlow {
            coroutineScope {
                val firstValueMutex = Mutex(true)
                val prefsListener =
                    SharedPreferences.OnSharedPreferenceChangeListener { sharedPreferences, k ->
                        if (k == key) {
                            val value = sharedPreferences.retrieve(key, defaultValue)
                            launch(start = CoroutineStart.UNDISPATCHED) { firstValueMutex.withLock { send(value) } }
                        }
                    }
                registerOnSharedPreferenceChangeListener(prefsListener)
                send(retrieve(key, defaultValue))
                firstValueMutex.unlock()
                awaitClose { unregisterOnSharedPreferenceChangeListener(prefsListener) }
            }
        }
        .distinctUntilChanged()
}
u
That would work. I was hoping to avoid this 😞
j
@uli but if the initial value is emitted after a real change occurs, wouldn't it still be correct then? It would be retrieved from the prefs and be up to date right?
true story 1
u
I started thinking about that. I am not convinced yet, but I'll keep thinking 🙂
I think we have no ordering guarantees here. So if we have multiple, consecutive changes, emitting from different contexts might always come in random order
I do know that the listener fires on the main thread. Maybe the solution is as easy as scheduling the whole setup code to the main thread a well
m
Just use a channel 😛
j
@myanmarking
callbackFlow
literally gives a channel to send elements to. Doesn't mean there can't be a race between different parallel senders to the channel
m
yes, but you do not control when the collection starts. Doesn’t this approach instead solve the problem? https://github.com/cashapp/copper/blob/trunk/copper-flow/src/main/java/app/cash/copper/flow/FlowContentResolver.kt
j
No that's just an old way of writing the same thing. It suffers from the same problem: if you send the initial value before registering the listener, you might miss a change. Maybe in the use case of your link it's ok, but for shared preferences it might not be. The problem would not exist if registering the listener would automatically trigger at least one initial change. @uli do you confirm that's not the case?
m
since you have full control of the channel, and can send whatever you want, when you want, chose the capacity you want. For me it is a better fit. I would use that
and btw, that is not an old way. When this code was written, there was already a channelFlow and callbackFlow. They just chose that way, not clear why
j
Maybe that's just me but I fail to understand how you would solve the race issue here. The point is to make sure of 2 things: 1. you don't lose the first update events after sending the initial value (so you can't retrieve the initial value before registering the callback) 2. you don't send an outdated initial value after the first update event(s) I guess as mentioned above, if the shared preferences are only handled on the main thread, maybe #1 can be prevented by just doing things on the main thread
m
capacity 1, drop oldest, send in the initial value first. Wouldn’t that work ?
j
When do you register the callback? After getting the initial value?
m
after sending, yes. Maybe i’m missing something, The way i see it, it would work
j
Then you break #1. You have retrieved the initial value, then someone could fire a change, then you register the callback - you missed the change
Also capacity 1 + drop oldest means you conflate events, so you might drop updates. This might be ok for shared preferences (not sure), but it's not ok in general
I guess there is no solution unless you can have some backpressure on the other places where sharedprefs can be updated. Maybe the shared prefs listener enforces that (through the main thread), and a mutex or a second channel setup as rendez-vous would be fine.
m
I don’t understand the issue, as i dont understand what you mean by “You have retrieved the initial value, then someone could fire a change, then you register the callback - you missed the change”
j
Other code from other threads could modify the preference value you're trying to listen to. If this happens between the moment you read the initial value and the moment you start listening for changes, you won't get the change. That is of course unless registering the listener automatically triggers a first event with the current value, but if that were the case we wouldn't be having this conversation I believe.
m
then just register the listener first, then check with if-else if there is any value emitted, if not, send the initial value
best you can do i guess
j
Then the
if-else
has the race
m
atomicBoolean, hasSentFirstElement. solved
j
That's basically the mutex solution from above, and like the mutex solution, I guess it can only work if the listener itself enforces backpressure on pref value changes
m
i mean, if you send the initial value, register the listener, you will get the most updated value. It will always have the possibility of dropping a value between. There is nothing you can do about that, unless fixed by them idk ?
u
@Joffrey I agree, a channel probably can not solve the race. I went with the main thread approach now, which probably works the same way a mutex would but is a lot more readable
👍 1
Just to follow up, inspired by this thread: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1656341425076459?thread_ts=1656339419.820709&amp;cid=C1CFAFJSK Looks like
onSubscription
instead of onStart might do the trick. The documentation looks promising. I’ll think about it.
143 Views