Ahaisting
06/24/2021, 8:03 PMcombine behavior that I don’t expect, hoping someone can help me shed some light. (putting code snippet in thread)Ahaisting
06/24/2021, 8:03 PMval flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
combine(
flow1,
flow2
) { english, spanish ->
Timber.e("testing: $english $spanish")
Pair(english, spanish)
}.launchIn(viewModelScope)
flow2.value = "Como estas?"
flow1.value = "How are you?"
I would expect to get this output:
Hello Hola
Hello Como estas?
How Are you? Como estas?
but instead I actually get:
Hello Hola
How are you? Hola
How are you? Como estas?Ahaisting
06/24/2021, 8:18 PMfun main() {
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
combine(
flow1,
flow2
) { english, spanish ->
println("testing: $english $spanish")
Pair(english, spanish)
}.launchIn(GlobalScope)
Thread.sleep(1000)
flow2.value = "Como estas?"
Thread.sleep(1000)
flow1.value = "How are you?"
Thread.sleep(1000)
}Ahaisting
06/24/2021, 8:18 PMviewModelScope may be causing the behavior somehow, hopefully we aren’t too far into 😶 territory here 😅Justin
06/24/2021, 10:24 PMGlobalScope, by default, use Dispatchers.Default which can run more than one coroutine in parallel - someone please correct me if I’m mistaken here
In contrast, viewModelScope effectively uses Dispatchers.Main which is on the main thread (i.e. one thread) in the lifecycle-viewmodel-ktx library, so it never actually runs code in parallel (though it’s extremely fast at switching between suspending coroutines)
Also, when you use Thread.sleep() , you’re essentially blocking the current thread. If you’re in a coroutine you’d want to use delay() whenever you need to delay something
so what you’re seeing appears to be a concurrency issue, i.e. a race condition
I think, assuming a MutableStateFlow is definitely what you want to use, you’d want to StateFlow.collect the combinedFlow by doing something like:
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
val combinedFlow = combine(
flow1,
flow2
) { english: String, spanish: String ->
Pair(english, spanish)
}
viewModelScope.launch {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
delay(1_000)
flow2.value = "Como estas?"
flow1.value = "How are you?"
Take a look at https://developer.android.com/kotlin/flow/stateflow-and-sharedflow if you haven’t alreadyBrian Yencho
06/25/2021, 1:27 PMGlobalScope and Thread.sleep in the above example was just to those the issue does not occur for other dispatchers. It seems limited to Dispatchers.Main.immediate.Brian Yencho
06/25/2021, 1:29 PMviewModelScope , there is not Thread.sleep / delay needed. That again was to just demonstrate using a different dispatcher.Brian Yencho
06/25/2021, 1:32 PMdelay, and run the whole thing on the main thread of an Android application, we still got the incorrect output of
testing: Hello, Hola
testing: How are you?, Hola
testing: How are you?, Como estas?Brian Yencho
06/25/2021, 1:32 PMDispatchers.Main.immediate and how its getting used by combine.Brian Yencho
06/25/2021, 1:41 PMcombine call:
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(viewModelScope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(viewModelScope)
val combinedFlow = combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") }
) { english: String, spanish: String ->
Pair(english, spanish)
}
viewModelScope.launch {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
Timber.i("Updating flow2")
flow2.value = "Como estas?"
Timber.i("Updating flow1")
flow1.value = "How are you?"
This results in
I: Flow 1 emit (outside combine) : Hello
I: Flow 2 emit (outside combine) : Hola
I: Flow 1 emit (inside combine) : Hello
I: Flow 2 emit (inside combine) : Hola
I: testing: Hello, Hola
I: Updating flow2
I: Flow 2 emit (outside combine) : Como estas?
I: Updating flow1
I: Flow 1 emit (outside combine) : How are you?
I: Flow 1 emit (inside combine) : How are you?
I: testing: How are you?, Hola
I: Flow 2 emit (inside combine) : Como estas?
I: testing: How are you?, Como estas?
So the flows collected outside the combine call emit in the order you’d expect, but the ones inside seem to only go flow1 to flow2Justin
06/25/2021, 2:08 PMDispatchers.Main would help, e.g.:
viewModelScope.launch {
withContext(Dispatchers.Main) {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
}
It doesn’t have the optimization that MainCoroutineDispatcher.immediate has of avoiding an additional dispatch, but if it yields expected behavior then maybe that’s a good workaround for now. In the meantime I recommend filing a bug, though I’m not entirely sure if you’d want to file a bug with google or with immediate is in kotlinx-coroutines-androidBrian Yencho
06/25/2021, 2:22 PMBrian Yencho
06/25/2021, 2:23 PMDispatchers.Main.immediate works.Justin
06/25/2021, 2:27 PMkotlinx.coroutines.android.HandlerContext is the where immediate is implemented for Android, which is in kotlinx-coroutines-android …is that library maintained by jetbrains or the android team? I know they collaborate on different things but I’m not sureBrian Yencho
06/25/2021, 2:28 PMJustin
06/25/2021, 2:29 PMBrian Yencho
06/25/2021, 2:29 PMBrian Yencho
06/25/2021, 2:57 PMval scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
val flow1 = MutableStateFlow("1 initial")
val flow2 = MutableStateFlow("2 initial")
val flow3 = MutableStateFlow("3 initial")
val flow4 = MutableStateFlow("4 initial")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
) { one, two, three, four ->
Timber.i("Values: $one, $two, $three, $four")
}
.launchIn(scope)
Timber.i("Updating flow2")
flow2.value = "2 later"
Timber.i("Updating flow4")
flow4.value = "4 later"
Timber.i("Updating flow1")
flow1.value = "1 later"
Timber.i("Updating flow3")
flow3.value = "3 later"
That produces:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Updating flow2
I: Updating flow4
I: Updating flow1
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 2 emit (outside combine) : 2 later
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 later, 4 later
So even though it produces the final correct combination, the emissions that the combine call looks like its getting are not at all related to the order in which they happened.Brian Yencho
06/25/2021, 2:57 PMBrian Yencho
06/25/2021, 2:58 PMval scope = CoroutineScope(Dispatchers.Main.immediate)
the “inside combine” emissions follow the order the Flows are passed to the function:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (inside combine) : 1 later
I: Values: 1 later, 2 initial, 3 initial, 4 initial
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 initial, 4 initial
I: Flow 3 emit (inside combine) : 3 later
I: Values: 1 later, 2 later, 3 later, 4 initial
I: Flow 4 emit (inside combine) : 4 later
I: Values: 1 later, 2 later, 3 later, 4 laterBrian Yencho
06/25/2021, 2:59 PMBrian Yencho
06/25/2021, 3:00 PMflow.value calls are sequential on a single thread.Brian Yencho
06/25/2021, 3:04 PMfun main() {
val processor1 = BehaviorProcessor.createDefault("1 initial")
val processor2 = BehaviorProcessor.createDefault("2 initial")
val processor3 = BehaviorProcessor.createDefault("3 initial")
val processor4 = BehaviorProcessor.createDefault("4 initial")
processor1
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 1 emit (outside combineLatest)") }
processor2
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 2 emit (outside combineLatest)") }
processor3
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 3 emit (outside combineLatest)") }
processor4
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 4 emit (outside combineLatest)") }
Flowable
.combineLatest(
processor1.doOnEach { System.out.println("Processor 1 emit (inside combineLatest)") },
processor2.doOnEach { System.out.println("Processor 2 emit (inside combineLatest)") },
processor3.doOnEach { System.out.println("Processor 3 emit (inside combineLatest)") },
processor4.doOnEach { System.out.println("Processor 4 emit (inside combineLatest)") },
{ one, two, three, four ->
System.out.println("Values: $one, $two, $three, $four")
}
)
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { }
System.out.println("Updated Processor 2")
processor2.onNext("2 later")
System.out.println("Updated Processor 4")
processor4.onNext("4 later")
System.out.println("Updated Processor 1")
processor1.onNext("1 later")
System.out.println("Updated Processor 3")
processor3.onNext("3 later")
}
That produces the full sequence of expected updates in the correct order:
Processor 1 emit (outside combineLatest)
Processor 2 emit (outside combineLatest)
Processor 3 emit (outside combineLatest)
Processor 4 emit (outside combineLatest)
Processor 1 emit (inside combineLatest)
Processor 2 emit (inside combineLatest)
Processor 3 emit (inside combineLatest)
Processor 4 emit (inside combineLatest)
Values: 1 initial, 2 initial, 3 initial, 4 initial
Updated Processor 2
Processor 2 emit (inside combineLatest)
Processor 2 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 initial
Updated Processor 4
Processor 4 emit (inside combineLatest)
Processor 4 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 later
Updated Processor 1
Processor 1 emit (inside combineLatest)
Processor 1 emit (outside combineLatest)
Values: 1 later, 2 later, 3 initial, 4 later
Updated Processor 3
Processor 3 emit (inside combineLatest)
Processor 3 emit (outside combineLatest)
Values: 1 later, 2 later, 3 later, 4 laterJustin
06/25/2021, 3:15 PMBrian Yencho
06/25/2021, 3:17 PMJustin
06/25/2021, 3:22 PMJustin
06/25/2021, 3:37 PMBrian Yencho
06/25/2021, 3:39 PMval scope = CoroutineScope(Dispatchers.Unconfined)
val flow1 = MutableStateFlow("1 initial")
val flow2 = MutableStateFlow("2 initial")
val flow3 = MutableStateFlow("3 initial")
val flow4 = MutableStateFlow("4 initial")
val flow5 = MutableStateFlow("5 initial")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
flow5.onEach { Timber.i("Flow 5 emit : $it") }.launchIn(viewModelScope)
combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
) { one, two, three, four ->
flow5.value = "Values: $one, $two, $three, $four"
}
.launchIn(scope)
Timber.i("Updating flow2")
flow2.value = "2 later"
Timber.i("Updating flow4")
flow4.value = "4 later"
Timber.i("Updating flow1")
flow1.value = "1 later"
Timber.i("Updating flow3")
flow3.value = "3 later"Brian Yencho
06/25/2021, 3:39 PMI: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 5 emit : 5 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Flow 5 emit : Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Flow 2 emit (inside combine) : 2 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 initial
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 initial, 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 later, 4 laterJustin
06/25/2021, 3:39 PMBrian Yencho
06/25/2021, 3:39 PMUnconfined to ensure we get values as soon as they are pushed. And then that can push to another Flow that can be collected in viewModelScope so it can update the UI etc.Brian Yencho
06/25/2021, 3:40 PMJustin
06/25/2021, 3:44 PMFlow.stateIn(coroutineScope) and Flow.shareIn(coroutineScope, SharingStarted.Eagerly) or whichever SharingStarted may be appropriate in your use caseBrian Yencho
06/25/2021, 3:44 PMBrian Yencho
06/25/2021, 3:44 PMJustin
06/25/2021, 3:45 PMAhaisting
06/25/2021, 3:45 PMJustin
06/25/2021, 3:52 PM