Is this always thread safe because the code here i...
# coroutines
z
Is this always thread safe because the code here is always executed in the same coroutine? Meaning, we don’t need to use an
AtomicRef
on
numberOfTimesUserHasChanged
because it’s guaranteed to be executed in the same coroutine? My concern is that
currentUserIdFlow
could switch threads/coroutines internally, but I think that would not be able to affect the coroutine in which
numberOfTimesUserHasChanged
is read and written from?
f
Is
numberOfTimesUserHasChanged
always going to be changed through Flow onEach?
z
There’s no race here because the flow operators will run sequentially, even if they happen to jump between threads. This is a weird way to write this logic though, and you could technically introduce an operator (e.g.
flowOn
) in between
onEach
and
collect
that could introduce parallelism and thus a race. Since
++
is an expression that returns a value, you could turn this into:
Copy code
currentUserIdFlow.map { number++ }
  .collect(::updateUi)
Although that’s still a little gross, because it’s using side effects. A more pure implementation would be:
Copy code
currentUserIdFlow
  .scan(0) { count, _ -> count + 1 }
  .drop(1)
  .collect(::updateUi)
1
z
This is a weird way to write this logic though
yeah it’s just an example, not something I’m actually using.
you could technically introduce an operator (e.g. flowOn) in between onEach and collect that could introduce parallelism and thus a race.
definitely
Is numberOfTimesUserHasChanged always going to be changed through Flow onEach?
yeah, it would always be changed/accessed from the same coroutine
I think it’s really cool that you can write simple logic like that with Flow because of how coroutines work
just wanted to make sure this was safe before I go tell my team it’s cool to use
z
You might need to make the variable
@Volatile
, but I can’t recall if that’s a thing for local vars vs fields.
z
I’m under the impression
@Volatile
doesn’t need to be used for properties that are written/read from the same coroutine
f
@Zach Klippenstein (he/him) [MOD] I think Even if
flowOn
is used, there will be no race since the value is only changed in intermediate operators (onEach) but not terminal operators
It will be executed sequentially
z
oh,
flowOn
would change the dispatcher not the coroutine
f
Yeah
z
I guess what I would need to watch for are any operators where the lambda passed to them are executed in a different coroutine than the
.collect()
one
f
Yeah they will be (intermediate operators) executed on a different coroutine than the collect but that intermediate operators will run sequentially on the same coroutine
So as long as you don't modify that value in collect there should not be any race I believe
z
I mean let’s say I do
I’m trying to identify which code is safe to do this in
like if I add a
.flatMap { .. }
do I break it?
f
No it does not break since it is an intermediate operator
z
why does the intermediate operator part matter?
it just matters which coroutine the lambdas are executing in
f
The thing is if you use flowOn and try to change the value from terminal operators (in your case collect) it might break
z
I guess I should figure out how to log this info to find out exactly
I don’t ever use
flowOn
because I don’t see the point
my main concern would be `.flatMapConcat`/`flatMapLatest`/`flatMapMerge`
f
Then it is safe since all the work will be sequentially executed
👍 1
z
I don’t ever use flowOn because I don’t see the point
I execute in the default dispatcher by default, otherwise I use
withContext(..)
in any suspending functions that require the non-default dispatcher
Then it is safe since all the work will be sequentially executed
even if you are sequential across different coroutines, the code isn’t thread safe. For example if you have two suspending functions that utilize a mutex and update that shared state, I think you still need
@Volatile
or a lock object on that shared state IIRC
f
In Android I use lifecyleScope and by default it is using main dispatcher. I don't use default or io with that scope since I update UI. There I use flowOn
z
I do most of my work outside the UI layer
but anyway that’s a separate discussion
f
https://kotlinlang.slack.com/archives/C1CFAFJSK/p1588791231445200?thread_ts=1588787738.438900&cid=C1CFAFJSK This is why I asked if that value is only changed in that flow. If you have another Coroutine inside your scope and try to change it there will be a problem
z
My understanding is that
Flow<T>
runs in a single coroutine by default?
f
Yeah
l
Please someone correct me but from my understanding that can cause a race condition because yes the coroutine is the same but where is the guarantee that the thread will be the same once it suspends and resumes. Once it suspends and then continues the thread can be difference.
z
the thread only matters if you are worried about race conditions across coroutines
different threads across the same coroutine == no race condition
same thread across different coroutines == no race condition
l
Copy code
fun main() {

  val latch  = CountDownLatch(1)
  GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
    val threadId = Thread.currentThread().id
    println("${Thread.currentThread()}")
    repeat(1000) {
      if(Thread.currentThread().id != threadId) {
        println("WRONG THREAD")
        throw RuntimeException("wrong thread")
      }
      delay(1)
    }
    latch.countDown()
  }
  latch.await(2, TimeUnit.SECONDS)

}
this example you can see just by having a delay, it will suspend and when it comes back you will be in a different real quick
that flow you have will be like a delay()
z
the thread the coroutine executes in does not matter for the thread safety of the state defined within that coroutine
l
it will synchronize the variables for me?
z
my understanding of that is yes
l
that link doesn't explain the synchronizing variables between suspensions
but I guess the example somewhat matches what you say
it doesn't synchronize it, it is just that usually you cannot access the variable concurrently
but if you pass out of the function updateUi(numberOfTimesUserHasChanged)
then maybe you can
so the race can happen like this:
Copy code
fun main() {

  val latch  = CountDownLatch(2)
  runBlocking {
    var myInt = 1
    GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
      repeat(1000) {
        myInt += 1
        delay(1)
        myInt -= 1
        if (myInt != 1) {
          println("ERROR")
          throw RuntimeException("wrong count")
        }
      }
      latch.countDown()
    }

    GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {

      repeat(100000) {
        myInt += 1
        delay(1)
        myInt -= 1
        if (myInt != 1) {
          println("ERROR")
          throw RuntimeException("wrong count")
        }
      }
      latch.countDown()
    }
  }

  latch.await(2, TimeUnit.SECONDS)

}
z
you’re launching two coroutines on two different threads and updating shared state
myInt
between the two
of course there’s a race condition
do it with a single coroutine
e
It is safe. Slightly longer and a bit more technical explanation is here: https://proandroiddev.com/what-is-concurrent-access-to-mutable-state-f386e5cb8292
👍 3