Hey y’all! I’m seeing some `combine` behavior that...
# coroutines
a
Hey y’all! I’m seeing some
combine
behavior that I don’t expect, hoping someone can help me shed some light. (putting code snippet in thread)
given this snippet:
Copy code
val flow1 = MutableStateFlow("Hello")
        val flow2 = MutableStateFlow("Hola")
        combine(
            flow1,
            flow2
        ) { english, spanish ->
            Timber.e("testing: $english $spanish")
            Pair(english, spanish)
        }.launchIn(viewModelScope)
        flow2.value = "Como estas?"
        flow1.value = "How are you?"
I would expect to get this output:
Copy code
Hello Hola
Hello Como estas?
How Are you? Como estas?
but instead I actually get:
Copy code
Hello Hola
How are you? Hola
How are you? Como estas?
another couple pieces of info… this works as expected when I run this in a pure kotlin function, like:
Copy code
fun main() {
    val flow1 = MutableStateFlow("Hello")
    val flow2 = MutableStateFlow("Hola")
    combine(
        flow1,
        flow2
    ) { english, spanish ->
        println("testing: $english $spanish")
        Pair(english, spanish)
    }.launchIn(GlobalScope)
    Thread.sleep(1000)
    flow2.value = "Como estas?"
    Thread.sleep(1000)
    flow1.value = "How are you?"
    Thread.sleep(1000)
}
this makes me suspect that
viewModelScope
may be causing the behavior somehow, hopefully we aren’t too far into 😶 territory here 😅
j
Not sure what you’re trying to do but I can tell you that coroutines launched on
GlobalScope
, by default, use
Dispatchers.Default
which can run more than one coroutine in parallel - someone please correct me if I’m mistaken here In contrast,
viewModelScope
effectively uses
Dispatchers.Main
which is on the main thread (i.e. one thread) in the
lifecycle-viewmodel-ktx
library, so it never actually runs code in parallel (though it’s extremely fast at switching between suspending coroutines) Also, when you use
Thread.sleep()
, you’re essentially blocking the current thread. If you’re in a coroutine you’d want to use
delay()
whenever you need to delay something so what you’re seeing appears to be a concurrency issue, i.e. a race condition I think, assuming a
MutableStateFlow
is definitely what you want to use, you’d want to
StateFlow.collect
the
combinedFlow
by doing something like:
Copy code
val flow1 = MutableStateFlow("Hello")
            val flow2 = MutableStateFlow("Hola")
            val combinedFlow = combine(
                flow1,
                flow2
            ) { english: String, spanish: String ->
                Pair(english, spanish)
            }
            viewModelScope.launch {
                combinedFlow.collect { (english, spanish) ->
                    Timber.i("testing: $english, $spanish")
                }
            }
            delay(1_000)
            flow2.value = "Como estas?"
            flow1.value = "How are you?"
Take a look at https://developer.android.com/kotlin/flow/stateflow-and-sharedflow if you haven’t already
b
@Justin So the use of
GlobalScope
and
Thread.sleep
in the above example was just to those the issue does not occur for other dispatchers. It seems limited to
Dispatchers.Main.immediate
.
Also note that in the original example with
viewModelScope
, there is not
Thread.sleep
/
delay
needed. That again was to just demonstrate using a different dispatcher.
So if we take your suggestion from above, remove the
delay
, and run the whole thing on the main thread of an Android application, we still got the incorrect output of
Copy code
testing: Hello, Hola
testing: How are you?, Hola
testing: How are you?, Como estas?
So there’s definitely something wonky going on with
Dispatchers.Main.immediate
and how its getting used by
combine
.
This can be seen much more clearly if we print the emissions inside and outside of the
combine
call:
Copy code
val flow1 = MutableStateFlow("Hello")
        val flow2 = MutableStateFlow("Hola")

        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(viewModelScope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(viewModelScope)

        val combinedFlow = combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") }
        ) { english: String, spanish: String ->
            Pair(english, spanish)
        }
        viewModelScope.launch {
            combinedFlow.collect { (english, spanish) ->
                Timber.i("testing: $english, $spanish")
            }
        }
        Timber.i("Updating flow2")
        flow2.value = "Como estas?"

        Timber.i("Updating flow1")
        flow1.value = "How are you?"
This results in
Copy code
I: Flow 1 emit (outside combine) : Hello
I: Flow 2 emit (outside combine) : Hola
I: Flow 1 emit (inside combine) : Hello
I: Flow 2 emit (inside combine) : Hola
I: testing: Hello, Hola
I: Updating flow2
I: Flow 2 emit (outside combine) : Como estas?
I: Updating flow1
I: Flow 1 emit (outside combine) : How are you?
I: Flow 1 emit (inside combine) : How are you?
I: testing: How are you?, Hola
I: Flow 2 emit (inside combine) : Como estas?
I: testing: How are you?, Como estas?
So the flows collected outside the
combine
call emit in the order you’d expect, but the ones inside seem to only go
flow1
to
flow2
j
@Brian Yencho thanks for all the detail. I wonder if using
Dispatchers.Main
would help, e.g.:
Copy code
viewModelScope.launch {
    withContext(Dispatchers.Main) {
        combinedFlow.collect { (english, spanish) ->
            Timber.i("testing: $english, $spanish")
        }
    }
}
It doesn’t have the optimization that
MainCoroutineDispatcher.immediate
has of avoiding an additional dispatch, but if it yields expected behavior then maybe that’s a good workaround for now. In the meantime I recommend filing a bug, though I’m not entirely sure if you’d want to file a bug with google or with kotlin jetbrains…seems like it should be kotlin jetbrains because the implementation of
immediate
is in
kotlinx-coroutines-android
b
Yeah that’s one workaround we’ve looked into, because that does “work”. But only because it ignores any of the initial values until the current Looper run is over.
And yeah I think we are going to first try filing a bug with the Android team because this seems very particular to how
Dispatchers.Main.immediate
works.
j
kotlinx.coroutines.android.HandlerContext
is the where
immediate
is implemented for Android, which is in
kotlinx-coroutines-android
…is that library maintained by jetbrains or the android team? I know they collaborate on different things but I’m not sure
b
Yeah I’m not too sure
j
I’ll see if I can find out from someone on the android team
b
Cool, thanks 👍
So another interesting fact is that if we switch to a different Dispatcher can check the emissions of each Flow, the order is also not quite what you’d expect:
Copy code
val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
        val flow1 = MutableStateFlow("1 initial")
        val flow2 = MutableStateFlow("2 initial")
        val flow3 = MutableStateFlow("3 initial")
        val flow4 = MutableStateFlow("4 initial")
        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
        flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
        flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
        combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
            flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
            flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
        ) { one, two, three, four ->
            Timber.i("Values: $one, $two, $three, $four")
        }
            .launchIn(scope)
        Timber.i("Updating flow2")
        flow2.value = "2 later"
        Timber.i("Updating flow4")
        flow4.value = "4 later"
        Timber.i("Updating flow1")
        flow1.value = "1 later"
        Timber.i("Updating flow3")
        flow3.value = "3 later"
That produces:
Copy code
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Updating flow2
I: Updating flow4
I: Updating flow1
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 2 emit (outside combine) : 2 later
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 later, 4 later
So even though it produces the final correct combination, the emissions that the
combine
call looks like its getting are not at all related to the order in which they happened.
1, 3, 4, 2 when you’d expect 2, 4, 1, 3
And when using
Copy code
val scope = CoroutineScope(Dispatchers.Main.immediate)
the “inside combine” emissions follow the order the Flows are passed to the function:
Copy code
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (inside combine) : 1 later
I: Values: 1 later, 2 initial, 3 initial, 4 initial
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 initial, 4 initial
I: Flow 3 emit (inside combine) : 3 later
I: Values: 1 later, 2 later, 3 later, 4 initial
I: Flow 4 emit (inside combine) : 4 later
I: Values: 1 later, 2 later, 3 later, 4 later
1, 2, 3, 4
Neither of those results is what I’d expect, given that the
flow.value
calls are sequential on a single thread.
For what its worth, when comparing to RxJava, it works exactly as expected:
Copy code
fun main() {
    val processor1 = BehaviorProcessor.createDefault("1 initial")
    val processor2 = BehaviorProcessor.createDefault("2 initial")
    val processor3 = BehaviorProcessor.createDefault("3 initial")
    val processor4 = BehaviorProcessor.createDefault("4 initial")
    processor1
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 1 emit (outside combineLatest)") }
    processor2
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 2 emit (outside combineLatest)") }
    processor3
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 3 emit (outside combineLatest)") }
    processor4
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 4 emit (outside combineLatest)") }
    Flowable
        .combineLatest(
            processor1.doOnEach { System.out.println("Processor 1 emit (inside combineLatest)") },
            processor2.doOnEach { System.out.println("Processor 2 emit (inside combineLatest)") },
            processor3.doOnEach { System.out.println("Processor 3 emit (inside combineLatest)") },
            processor4.doOnEach { System.out.println("Processor 4 emit (inside combineLatest)") },
            { one, two, three, four ->
                System.out.println("Values: $one, $two, $three, $four")
            }
        )
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { }
    System.out.println("Updated Processor 2")
    processor2.onNext("2 later")
    System.out.println("Updated Processor 4")
    processor4.onNext("4 later")
    System.out.println("Updated Processor 1")
    processor1.onNext("1 later")
    System.out.println("Updated Processor 3")
    processor3.onNext("3 later")
}
That produces the full sequence of expected updates in the correct order:
Copy code
Processor 1 emit (outside combineLatest)
Processor 2 emit (outside combineLatest)
Processor 3 emit (outside combineLatest)
Processor 4 emit (outside combineLatest)
Processor 1 emit (inside combineLatest)
Processor 2 emit (inside combineLatest)
Processor 3 emit (inside combineLatest)
Processor 4 emit (inside combineLatest)
Values: 1 initial, 2 initial, 3 initial, 4 initial
Updated Processor 2
Processor 2 emit (inside combineLatest)
Processor 2 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 initial
Updated Processor 4
Processor 4 emit (inside combineLatest)
Processor 4 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 later
Updated Processor 1
Processor 1 emit (inside combineLatest)
Processor 1 emit (outside combineLatest)
Values: 1 later, 2 later, 3 initial, 4 later
Updated Processor 3
Processor 3 emit (inside combineLatest)
Processor 3 emit (outside combineLatest)
Values: 1 later, 2 later, 3 later, 4 later
b
Excellent, thanks!
lmk if that helped any
b
It did for sure. It made me realize we could do this:
Copy code
val scope = CoroutineScope(Dispatchers.Unconfined)
        val flow1 = MutableStateFlow("1 initial")
        val flow2 = MutableStateFlow("2 initial")
        val flow3 = MutableStateFlow("3 initial")
        val flow4 = MutableStateFlow("4 initial")
        val flow5 = MutableStateFlow("5 initial")
        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
        flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
        flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
        flow5.onEach { Timber.i("Flow 5 emit : $it") }.launchIn(viewModelScope)
        combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
            flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
            flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
        ) { one, two, three, four ->
            flow5.value = "Values: $one, $two, $three, $four"
        }
            .launchIn(scope)
        Timber.i("Updating flow2")
        flow2.value = "2 later"
        Timber.i("Updating flow4")
        flow4.value = "4 later"
        Timber.i("Updating flow1")
        flow1.value = "1 later"
        Timber.i("Updating flow3")
        flow3.value = "3 later"
That results in
Copy code
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 5 emit : 5 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Flow 5 emit : Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Flow 2 emit (inside combine) : 2 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 initial
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 initial, 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 later, 4 later
🙌 1
j
ooohhhh very nice
b
So we’d combine with
Unconfined
to ensure we get values as soon as they are pushed. And then that can push to another Flow that can be collected in
viewModelScope
so it can update the UI etc.
There are probably some details here to work out but I think the rough idea is there.
j
yeah that’s awesome. Also, when I was looking into this, I came across
Flow.stateIn(coroutineScope)
and
Flow.shareIn(coroutineScope, SharingStarted.Eagerly)
or whichever
SharingStarted
may be appropriate in your use case
b
Sure 👍
Thanks for your help
j
no problem! happy to help
a
thanks to both of you for the responses…. I was really losing my mind yesterday figuring out what was going on!! I appreciate it.
😄 1
j
yeah I know how that is lol, happy to help