https://kotlinlang.org logo
#coroutines
Title
# coroutines
z

zak.taccardi

05/06/2020, 5:55 PM
Is this always thread safe because the code here is always executed in the same coroutine? Meaning, we don’t need to use an
AtomicRef
on
numberOfTimesUserHasChanged
because it’s guaranteed to be executed in the same coroutine? My concern is that
currentUserIdFlow
could switch threads/coroutines internally, but I think that would not be able to affect the coroutine in which
numberOfTimesUserHasChanged
is read and written from?
f

fatih

05/06/2020, 6:03 PM
Is
numberOfTimesUserHasChanged
always going to be changed through Flow onEach?
z

Zach Klippenstein (he/him) [MOD]

05/06/2020, 6:13 PM
There’s no race here because the flow operators will run sequentially, even if they happen to jump between threads. This is a weird way to write this logic though, and you could technically introduce an operator (e.g.
flowOn
) in between
onEach
and
collect
that could introduce parallelism and thus a race. Since
++
is an expression that returns a value, you could turn this into:
Copy code
currentUserIdFlow.map { number++ }
  .collect(::updateUi)
Although that’s still a little gross, because it’s using side effects. A more pure implementation would be:
Copy code
currentUserIdFlow
  .scan(0) { count, _ -> count + 1 }
  .drop(1)
  .collect(::updateUi)
1
z

zak.taccardi

05/06/2020, 6:16 PM
This is a weird way to write this logic though
yeah it’s just an example, not something I’m actually using.
you could technically introduce an operator (e.g. flowOn) in between onEach and collect that could introduce parallelism and thus a race.
definitely
Is numberOfTimesUserHasChanged always going to be changed through Flow onEach?
yeah, it would always be changed/accessed from the same coroutine
I think it’s really cool that you can write simple logic like that with Flow because of how coroutines work
just wanted to make sure this was safe before I go tell my team it’s cool to use
z

Zach Klippenstein (he/him) [MOD]

05/06/2020, 6:37 PM
You might need to make the variable
@Volatile
, but I can’t recall if that’s a thing for local vars vs fields.
z

zak.taccardi

05/06/2020, 6:38 PM
I’m under the impression
@Volatile
doesn’t need to be used for properties that are written/read from the same coroutine
f

fatih

05/06/2020, 6:39 PM
@Zach Klippenstein (he/him) [MOD] I think Even if
flowOn
is used, there will be no race since the value is only changed in intermediate operators (onEach) but not terminal operators
It will be executed sequentially
z

zak.taccardi

05/06/2020, 6:40 PM
oh,
flowOn
would change the dispatcher not the coroutine
f

fatih

05/06/2020, 6:40 PM
Yeah
z

zak.taccardi

05/06/2020, 6:42 PM
I guess what I would need to watch for are any operators where the lambda passed to them are executed in a different coroutine than the
.collect()
one
f

fatih

05/06/2020, 6:44 PM
Yeah they will be (intermediate operators) executed on a different coroutine than the collect but that intermediate operators will run sequentially on the same coroutine
So as long as you don't modify that value in collect there should not be any race I believe
z

zak.taccardi

05/06/2020, 6:45 PM
I mean let’s say I do
I’m trying to identify which code is safe to do this in
like if I add a
.flatMap { .. }
do I break it?
f

fatih

05/06/2020, 6:48 PM
No it does not break since it is an intermediate operator
z

zak.taccardi

05/06/2020, 6:48 PM
why does the intermediate operator part matter?
it just matters which coroutine the lambdas are executing in
f

fatih

05/06/2020, 6:50 PM
The thing is if you use flowOn and try to change the value from terminal operators (in your case collect) it might break
z

zak.taccardi

05/06/2020, 6:50 PM
I guess I should figure out how to log this info to find out exactly
I don’t ever use
flowOn
because I don’t see the point
my main concern would be `.flatMapConcat`/`flatMapLatest`/`flatMapMerge`
f

fatih

05/06/2020, 6:51 PM
Then it is safe since all the work will be sequentially executed
👍 1
z

zak.taccardi

05/06/2020, 6:52 PM
I don’t ever use flowOn because I don’t see the point
I execute in the default dispatcher by default, otherwise I use
withContext(..)
in any suspending functions that require the non-default dispatcher
Then it is safe since all the work will be sequentially executed
even if you are sequential across different coroutines, the code isn’t thread safe. For example if you have two suspending functions that utilize a mutex and update that shared state, I think you still need
@Volatile
or a lock object on that shared state IIRC
f

fatih

05/06/2020, 6:55 PM
In Android I use lifecyleScope and by default it is using main dispatcher. I don't use default or io with that scope since I update UI. There I use flowOn
z

zak.taccardi

05/06/2020, 6:55 PM
I do most of my work outside the UI layer
but anyway that’s a separate discussion
f

fatih

05/06/2020, 6:58 PM
https://kotlinlang.slack.com/archives/C1CFAFJSK/p1588791231445200?thread_ts=1588787738.438900&cid=C1CFAFJSK This is why I asked if that value is only changed in that flow. If you have another Coroutine inside your scope and try to change it there will be a problem
z

zak.taccardi

05/06/2020, 6:59 PM
My understanding is that
Flow<T>
runs in a single coroutine by default?
f

fatih

05/06/2020, 7:01 PM
Yeah
l

Luis Munoz

05/06/2020, 9:16 PM
Please someone correct me but from my understanding that can cause a race condition because yes the coroutine is the same but where is the guarantee that the thread will be the same once it suspends and resumes. Once it suspends and then continues the thread can be difference.
z

zak.taccardi

05/06/2020, 9:17 PM
the thread only matters if you are worried about race conditions across coroutines
different threads across the same coroutine == no race condition
same thread across different coroutines == no race condition
l

Luis Munoz

05/06/2020, 9:29 PM
Copy code
fun main() {

  val latch  = CountDownLatch(1)
  GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
    val threadId = Thread.currentThread().id
    println("${Thread.currentThread()}")
    repeat(1000) {
      if(Thread.currentThread().id != threadId) {
        println("WRONG THREAD")
        throw RuntimeException("wrong thread")
      }
      delay(1)
    }
    latch.countDown()
  }
  latch.await(2, TimeUnit.SECONDS)

}
this example you can see just by having a delay, it will suspend and when it comes back you will be in a different real quick
that flow you have will be like a delay()
z

zak.taccardi

05/06/2020, 9:30 PM
the thread the coroutine executes in does not matter for the thread safety of the state defined within that coroutine
l

Luis Munoz

05/06/2020, 9:31 PM
it will synchronize the variables for me?
z

zak.taccardi

05/06/2020, 9:31 PM
my understanding of that is yes
l

Luis Munoz

05/06/2020, 9:35 PM
that link doesn't explain the synchronizing variables between suspensions
but I guess the example somewhat matches what you say
it doesn't synchronize it, it is just that usually you cannot access the variable concurrently
but if you pass out of the function updateUi(numberOfTimesUserHasChanged)
then maybe you can
so the race can happen like this:
Copy code
fun main() {

  val latch  = CountDownLatch(2)
  runBlocking {
    var myInt = 1
    GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
      repeat(1000) {
        myInt += 1
        delay(1)
        myInt -= 1
        if (myInt != 1) {
          println("ERROR")
          throw RuntimeException("wrong count")
        }
      }
      latch.countDown()
    }

    GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {

      repeat(100000) {
        myInt += 1
        delay(1)
        myInt -= 1
        if (myInt != 1) {
          println("ERROR")
          throw RuntimeException("wrong count")
        }
      }
      latch.countDown()
    }
  }

  latch.await(2, TimeUnit.SECONDS)

}
z

zak.taccardi

05/06/2020, 9:55 PM
you’re launching two coroutines on two different threads and updating shared state
myInt
between the two
of course there’s a race condition
do it with a single coroutine
e

elizarov

05/07/2020, 7:13 AM
It is safe. Slightly longer and a bit more technical explanation is here: https://proandroiddev.com/what-is-concurrent-access-to-mutable-state-f386e5cb8292
👍 3
3 Views