https://kotlinlang.org logo
#coroutines
Title
# coroutines
e

Erik

08/04/2020, 10:04 AM
2. How can I implement a thread-safe
MutableStateFlow.value
read-and-write operation? I mean: first read the current
value
(this is thread-unsafe, because I don't know if another thread will change the value just after I read it), then use it to set it again. I can probably use a
Channel
for this, but how do I do this? And there are probably alternatives?
h

Horv

08/04/2020, 10:31 AM
I'm by no means an expert on the subject, just starting out on Android and messing with coroutines, channels, flow etc myself. But I think you could use ConflatedBroadcastChannel as in the example here: https://codelabs.developers.google.com/codelabs/advanced-kotlin-coroutines/#11
ConflatedBroadcastChannel
 is often a good way to insert events into a flow. It provides a concurrency primitive (or low-level tool) for passing values between several coroutines.
By conflating the events, we keep track of only the most recent event. This is often the correct thing to do, since UI events may come in faster than processing, and we usually don't care about intermediate values.
If you do need to pass all events between coroutines and don't want conflation, consider using a 
Channel
 which offers the semantics of a 
BlockingQueue
 using suspend functions. The 
channelFlow
 builder can be used to make channel backed flows.
So I guess from the quote I posted, a channel might be a better idea if you need read+write as an atomic operation.
e

Erik

08/04/2020, 3:32 PM
I have now used a
private val mutex = Mutex()
to lock access to my mutable state flow's read-write operation, more or less like so:
Copy code
class Foo {
    private val mutex = Mutex()
    private val flo = MutableStateFlow<Bar?>(null)

    suspend fun baz() = mutex.withLock {
        val bar = flo.value
        val newBar = qux(bar)
        flo.value = newBar
    }
}
Instead of a
Mutex
, I think that a private
CoroutineContext
based on a single thread (e.g.
Executors.newSingleThreadExecutor().asCoroutineDispatcher()
) would also work, when used in
withContext(coroutineContext)
wrapping the read-write operation. The downside is that the context uses resource that one must
close()
, but there can be a performance gain depending on the number of callers of
baz()
.
z

Zach Klippenstein (he/him) [MOD]

08/04/2020, 4:39 PM
Also I think it's getting a CAS operator when the new shared flow stuff lands.
e

Erik

08/04/2020, 4:40 PM
CAS?
z

Zach Klippenstein (he/him) [MOD]

08/04/2020, 4:42 PM
Compare-and-set
e

Erik

08/04/2020, 4:43 PM
Ah, yes, I found that one in the open PR in the repo
But it's not a solution for my use case, because the cas operator only runs when the current value has the expected value. But I don't know what the current value should be. I just need the current value, process it, store it, without others writing to it in the meantime
So unfortunately I need a bit more heavy-weight locking
👍 1
Luckily, I know there will be few callers of the operator, so they will seldomly suspend to obtain the lock. I'm not sure if the mutex mechanics incur too much cost for this scenario, but I think it's cheap enough. The other way around: many concurrent invokers, would mean that the mutex is used a lot and that might perform bad
10 Views