Ahaisting
06/24/2021, 8:03 PMcombine
behavior that I don’t expect, hoping someone can help me shed some light. (putting code snippet in thread)val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
combine(
flow1,
flow2
) { english, spanish ->
Timber.e("testing: $english $spanish")
Pair(english, spanish)
}.launchIn(viewModelScope)
flow2.value = "Como estas?"
flow1.value = "How are you?"
I would expect to get this output:
Hello Hola
Hello Como estas?
How Are you? Como estas?
but instead I actually get:
Hello Hola
How are you? Hola
How are you? Como estas?
fun main() {
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
combine(
flow1,
flow2
) { english, spanish ->
println("testing: $english $spanish")
Pair(english, spanish)
}.launchIn(GlobalScope)
Thread.sleep(1000)
flow2.value = "Como estas?"
Thread.sleep(1000)
flow1.value = "How are you?"
Thread.sleep(1000)
}
viewModelScope
may be causing the behavior somehow, hopefully we aren’t too far into :not-kotlin: territory here 😅Justin
06/24/2021, 10:24 PMGlobalScope
, by default, use Dispatchers.Default
which can run more than one coroutine in parallel - someone please correct me if I’m mistaken here
In contrast, viewModelScope
effectively uses Dispatchers.Main
which is on the main thread (i.e. one thread) in the lifecycle-viewmodel-ktx
library, so it never actually runs code in parallel (though it’s extremely fast at switching between suspending coroutines)
Also, when you use Thread.sleep()
, you’re essentially blocking the current thread. If you’re in a coroutine you’d want to use delay()
whenever you need to delay something
so what you’re seeing appears to be a concurrency issue, i.e. a race condition
I think, assuming a MutableStateFlow
is definitely what you want to use, you’d want to StateFlow.collect
the combinedFlow
by doing something like:
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
val combinedFlow = combine(
flow1,
flow2
) { english: String, spanish: String ->
Pair(english, spanish)
}
viewModelScope.launch {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
delay(1_000)
flow2.value = "Como estas?"
flow1.value = "How are you?"
Take a look at https://developer.android.com/kotlin/flow/stateflow-and-sharedflow if you haven’t alreadyBrian Yencho
06/25/2021, 1:27 PMGlobalScope
and Thread.sleep
in the above example was just to those the issue does not occur for other dispatchers. It seems limited to Dispatchers.Main.immediate
.viewModelScope
, there is not Thread.sleep
/ delay
needed. That again was to just demonstrate using a different dispatcher.delay
, and run the whole thing on the main thread of an Android application, we still got the incorrect output of
testing: Hello, Hola
testing: How are you?, Hola
testing: How are you?, Como estas?
Dispatchers.Main.immediate
and how its getting used by combine
.combine
call:
val flow1 = MutableStateFlow("Hello")
val flow2 = MutableStateFlow("Hola")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(viewModelScope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(viewModelScope)
val combinedFlow = combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") }
) { english: String, spanish: String ->
Pair(english, spanish)
}
viewModelScope.launch {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
Timber.i("Updating flow2")
flow2.value = "Como estas?"
Timber.i("Updating flow1")
flow1.value = "How are you?"
This results in
I: Flow 1 emit (outside combine) : Hello
I: Flow 2 emit (outside combine) : Hola
I: Flow 1 emit (inside combine) : Hello
I: Flow 2 emit (inside combine) : Hola
I: testing: Hello, Hola
I: Updating flow2
I: Flow 2 emit (outside combine) : Como estas?
I: Updating flow1
I: Flow 1 emit (outside combine) : How are you?
I: Flow 1 emit (inside combine) : How are you?
I: testing: How are you?, Hola
I: Flow 2 emit (inside combine) : Como estas?
I: testing: How are you?, Como estas?
So the flows collected outside the combine
call emit in the order you’d expect, but the ones inside seem to only go flow1
to flow2
Justin
06/25/2021, 2:08 PMDispatchers.Main
would help, e.g.:
viewModelScope.launch {
withContext(Dispatchers.Main) {
combinedFlow.collect { (english, spanish) ->
Timber.i("testing: $english, $spanish")
}
}
}
It doesn’t have the optimization that MainCoroutineDispatcher.immediate
has of avoiding an additional dispatch, but if it yields expected behavior then maybe that’s a good workaround for now. In the meantime I recommend filing a bug, though I’m not entirely sure if you’d want to file a bug with google or with immediate
is in kotlinx-coroutines-android
Brian Yencho
06/25/2021, 2:22 PMDispatchers.Main.immediate
works.Justin
06/25/2021, 2:27 PMkotlinx.coroutines.android.HandlerContext
is the where immediate
is implemented for Android, which is in kotlinx-coroutines-android
…is that library maintained by jetbrains or the android team? I know they collaborate on different things but I’m not sureBrian Yencho
06/25/2021, 2:28 PMJustin
06/25/2021, 2:29 PMBrian Yencho
06/25/2021, 2:29 PMval scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
val flow1 = MutableStateFlow("1 initial")
val flow2 = MutableStateFlow("2 initial")
val flow3 = MutableStateFlow("3 initial")
val flow4 = MutableStateFlow("4 initial")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
) { one, two, three, four ->
Timber.i("Values: $one, $two, $three, $four")
}
.launchIn(scope)
Timber.i("Updating flow2")
flow2.value = "2 later"
Timber.i("Updating flow4")
flow4.value = "4 later"
Timber.i("Updating flow1")
flow1.value = "1 later"
Timber.i("Updating flow3")
flow3.value = "3 later"
That produces:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Updating flow2
I: Updating flow4
I: Updating flow1
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 2 emit (outside combine) : 2 later
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 later, 4 later
So even though it produces the final correct combination, the emissions that the combine
call looks like its getting are not at all related to the order in which they happened.val scope = CoroutineScope(Dispatchers.Main.immediate)
the “inside combine” emissions follow the order the Flows are passed to the function:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (inside combine) : 1 later
I: Values: 1 later, 2 initial, 3 initial, 4 initial
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 initial, 4 initial
I: Flow 3 emit (inside combine) : 3 later
I: Values: 1 later, 2 later, 3 later, 4 initial
I: Flow 4 emit (inside combine) : 4 later
I: Values: 1 later, 2 later, 3 later, 4 later
flow.value
calls are sequential on a single thread.fun main() {
val processor1 = BehaviorProcessor.createDefault("1 initial")
val processor2 = BehaviorProcessor.createDefault("2 initial")
val processor3 = BehaviorProcessor.createDefault("3 initial")
val processor4 = BehaviorProcessor.createDefault("4 initial")
processor1
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 1 emit (outside combineLatest)") }
processor2
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 2 emit (outside combineLatest)") }
processor3
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 3 emit (outside combineLatest)") }
processor4
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { System.out.println("Processor 4 emit (outside combineLatest)") }
Flowable
.combineLatest(
processor1.doOnEach { System.out.println("Processor 1 emit (inside combineLatest)") },
processor2.doOnEach { System.out.println("Processor 2 emit (inside combineLatest)") },
processor3.doOnEach { System.out.println("Processor 3 emit (inside combineLatest)") },
processor4.doOnEach { System.out.println("Processor 4 emit (inside combineLatest)") },
{ one, two, three, four ->
System.out.println("Values: $one, $two, $three, $four")
}
)
.observeOn(<http://Schedulers.io|Schedulers.io>())
.subscribe { }
System.out.println("Updated Processor 2")
processor2.onNext("2 later")
System.out.println("Updated Processor 4")
processor4.onNext("4 later")
System.out.println("Updated Processor 1")
processor1.onNext("1 later")
System.out.println("Updated Processor 3")
processor3.onNext("3 later")
}
That produces the full sequence of expected updates in the correct order:
Processor 1 emit (outside combineLatest)
Processor 2 emit (outside combineLatest)
Processor 3 emit (outside combineLatest)
Processor 4 emit (outside combineLatest)
Processor 1 emit (inside combineLatest)
Processor 2 emit (inside combineLatest)
Processor 3 emit (inside combineLatest)
Processor 4 emit (inside combineLatest)
Values: 1 initial, 2 initial, 3 initial, 4 initial
Updated Processor 2
Processor 2 emit (inside combineLatest)
Processor 2 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 initial
Updated Processor 4
Processor 4 emit (inside combineLatest)
Processor 4 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 later
Updated Processor 1
Processor 1 emit (inside combineLatest)
Processor 1 emit (outside combineLatest)
Values: 1 later, 2 later, 3 initial, 4 later
Updated Processor 3
Processor 3 emit (inside combineLatest)
Processor 3 emit (outside combineLatest)
Values: 1 later, 2 later, 3 later, 4 later
Justin
06/25/2021, 3:15 PMBrian Yencho
06/25/2021, 3:17 PMJustin
06/25/2021, 3:22 PMBrian Yencho
06/25/2021, 3:39 PMval scope = CoroutineScope(Dispatchers.Unconfined)
val flow1 = MutableStateFlow("1 initial")
val flow2 = MutableStateFlow("2 initial")
val flow3 = MutableStateFlow("3 initial")
val flow4 = MutableStateFlow("4 initial")
val flow5 = MutableStateFlow("5 initial")
flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
flow5.onEach { Timber.i("Flow 5 emit : $it") }.launchIn(viewModelScope)
combine(
flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
) { one, two, three, four ->
flow5.value = "Values: $one, $two, $three, $four"
}
.launchIn(scope)
Timber.i("Updating flow2")
flow2.value = "2 later"
Timber.i("Updating flow4")
flow4.value = "4 later"
Timber.i("Updating flow1")
flow1.value = "1 later"
Timber.i("Updating flow3")
flow3.value = "3 later"
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 5 emit : 5 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Flow 5 emit : Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Flow 2 emit (inside combine) : 2 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 initial
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 initial, 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 later, 4 later
Justin
06/25/2021, 3:39 PMBrian Yencho
06/25/2021, 3:39 PMUnconfined
to ensure we get values as soon as they are pushed. And then that can push to another Flow that can be collected in viewModelScope
so it can update the UI etc.Justin
06/25/2021, 3:44 PMFlow.stateIn(coroutineScope)
and Flow.shareIn(coroutineScope, SharingStarted.Eagerly)
or whichever SharingStarted
may be appropriate in your use caseBrian Yencho
06/25/2021, 3:44 PMJustin
06/25/2021, 3:45 PMAhaisting
06/25/2021, 3:45 PMJustin
06/25/2021, 3:52 PM