https://kotlinlang.org logo
Title
a

Ahaisting

06/24/2021, 8:03 PM
Hey y’all! I’m seeing some
combine
behavior that I don’t expect, hoping someone can help me shed some light. (putting code snippet in thread)
given this snippet:
val flow1 = MutableStateFlow("Hello")
        val flow2 = MutableStateFlow("Hola")
        combine(
            flow1,
            flow2
        ) { english, spanish ->
            Timber.e("testing: $english $spanish")
            Pair(english, spanish)
        }.launchIn(viewModelScope)
        flow2.value = "Como estas?"
        flow1.value = "How are you?"
I would expect to get this output:
Hello Hola
Hello Como estas?
How Are you? Como estas?
but instead I actually get:
Hello Hola
How are you? Hola
How are you? Como estas?
another couple pieces of info… this works as expected when I run this in a pure kotlin function, like:
fun main() {
    val flow1 = MutableStateFlow("Hello")
    val flow2 = MutableStateFlow("Hola")
    combine(
        flow1,
        flow2
    ) { english, spanish ->
        println("testing: $english $spanish")
        Pair(english, spanish)
    }.launchIn(GlobalScope)
    Thread.sleep(1000)
    flow2.value = "Como estas?"
    Thread.sleep(1000)
    flow1.value = "How are you?"
    Thread.sleep(1000)
}
this makes me suspect that
viewModelScope
may be causing the behavior somehow, hopefully we aren’t too far into :not-kotlin: territory here 😅
j

Justin

06/24/2021, 10:24 PM
Not sure what you’re trying to do but I can tell you that coroutines launched on
GlobalScope
, by default, use
Dispatchers.Default
which can run more than one coroutine in parallel - someone please correct me if I’m mistaken here In contrast,
viewModelScope
effectively uses
Dispatchers.Main
which is on the main thread (i.e. one thread) in the
lifecycle-viewmodel-ktx
library, so it never actually runs code in parallel (though it’s extremely fast at switching between suspending coroutines) Also, when you use
Thread.sleep()
, you’re essentially blocking the current thread. If you’re in a coroutine you’d want to use
delay()
whenever you need to delay something so what you’re seeing appears to be a concurrency issue, i.e. a race condition I think, assuming a
MutableStateFlow
is definitely what you want to use, you’d want to
StateFlow.collect
the
combinedFlow
by doing something like:
val flow1 = MutableStateFlow("Hello")
            val flow2 = MutableStateFlow("Hola")
            val combinedFlow = combine(
                flow1,
                flow2
            ) { english: String, spanish: String ->
                Pair(english, spanish)
            }
            viewModelScope.launch {
                combinedFlow.collect { (english, spanish) ->
                    Timber.i("testing: $english, $spanish")
                }
            }
            delay(1_000)
            flow2.value = "Como estas?"
            flow1.value = "How are you?"
Take a look at https://developer.android.com/kotlin/flow/stateflow-and-sharedflow if you haven’t already
b

Brian Yencho

06/25/2021, 1:27 PM
@Justin So the use of
GlobalScope
and
Thread.sleep
in the above example was just to those the issue does not occur for other dispatchers. It seems limited to
Dispatchers.Main.immediate
.
Also note that in the original example with
viewModelScope
, there is not
Thread.sleep
/
delay
needed. That again was to just demonstrate using a different dispatcher.
So if we take your suggestion from above, remove the
delay
, and run the whole thing on the main thread of an Android application, we still got the incorrect output of
testing: Hello, Hola
testing: How are you?, Hola
testing: How are you?, Como estas?
So there’s definitely something wonky going on with
Dispatchers.Main.immediate
and how its getting used by
combine
.
This can be seen much more clearly if we print the emissions inside and outside of the
combine
call:
val flow1 = MutableStateFlow("Hello")
        val flow2 = MutableStateFlow("Hola")

        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(viewModelScope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(viewModelScope)

        val combinedFlow = combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") }
        ) { english: String, spanish: String ->
            Pair(english, spanish)
        }
        viewModelScope.launch {
            combinedFlow.collect { (english, spanish) ->
                Timber.i("testing: $english, $spanish")
            }
        }
        Timber.i("Updating flow2")
        flow2.value = "Como estas?"

        Timber.i("Updating flow1")
        flow1.value = "How are you?"
This results in
I: Flow 1 emit (outside combine) : Hello
I: Flow 2 emit (outside combine) : Hola
I: Flow 1 emit (inside combine) : Hello
I: Flow 2 emit (inside combine) : Hola
I: testing: Hello, Hola
I: Updating flow2
I: Flow 2 emit (outside combine) : Como estas?
I: Updating flow1
I: Flow 1 emit (outside combine) : How are you?
I: Flow 1 emit (inside combine) : How are you?
I: testing: How are you?, Hola
I: Flow 2 emit (inside combine) : Como estas?
I: testing: How are you?, Como estas?
So the flows collected outside the
combine
call emit in the order you’d expect, but the ones inside seem to only go
flow1
to
flow2
j

Justin

06/25/2021, 2:08 PM
@Brian Yencho thanks for all the detail. I wonder if using
Dispatchers.Main
would help, e.g.:
viewModelScope.launch {
    withContext(Dispatchers.Main) {
        combinedFlow.collect { (english, spanish) ->
            Timber.i("testing: $english, $spanish")
        }
    }
}
It doesn’t have the optimization that
MainCoroutineDispatcher.immediate
has of avoiding an additional dispatch, but if it yields expected behavior then maybe that’s a good workaround for now. In the meantime I recommend filing a bug, though I’m not entirely sure if you’d want to file a bug with google or with kotlin jetbrains…seems like it should be kotlin jetbrains because the implementation of
immediate
is in
kotlinx-coroutines-android
b

Brian Yencho

06/25/2021, 2:22 PM
Yeah that’s one workaround we’ve looked into, because that does “work”. But only because it ignores any of the initial values until the current Looper run is over.
And yeah I think we are going to first try filing a bug with the Android team because this seems very particular to how
Dispatchers.Main.immediate
works.
j

Justin

06/25/2021, 2:27 PM
kotlinx.coroutines.android.HandlerContext
is the where
immediate
is implemented for Android, which is in
kotlinx-coroutines-android
…is that library maintained by jetbrains or the android team? I know they collaborate on different things but I’m not sure
b

Brian Yencho

06/25/2021, 2:28 PM
Yeah I’m not too sure
j

Justin

06/25/2021, 2:29 PM
I’ll see if I can find out from someone on the android team
b

Brian Yencho

06/25/2021, 2:29 PM
Cool, thanks 👍
So another interesting fact is that if we switch to a different Dispatcher can check the emissions of each Flow, the order is also not quite what you’d expect:
val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
        val flow1 = MutableStateFlow("1 initial")
        val flow2 = MutableStateFlow("2 initial")
        val flow3 = MutableStateFlow("3 initial")
        val flow4 = MutableStateFlow("4 initial")
        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
        flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
        flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
        combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
            flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
            flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
        ) { one, two, three, four ->
            Timber.i("Values: $one, $two, $three, $four")
        }
            .launchIn(scope)
        Timber.i("Updating flow2")
        flow2.value = "2 later"
        Timber.i("Updating flow4")
        flow4.value = "4 later"
        Timber.i("Updating flow1")
        flow1.value = "1 later"
        Timber.i("Updating flow3")
        flow3.value = "3 later"
That produces:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Updating flow2
I: Updating flow4
I: Updating flow1
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 2 emit (outside combine) : 2 later
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 later, 4 later
So even though it produces the final correct combination, the emissions that the
combine
call looks like its getting are not at all related to the order in which they happened.
1, 3, 4, 2 when you’d expect 2, 4, 1, 3
And when using
val scope = CoroutineScope(Dispatchers.Main.immediate)
the “inside combine” emissions follow the order the Flows are passed to the function:
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 1 emit (inside combine) : 1 later
I: Values: 1 later, 2 initial, 3 initial, 4 initial
I: Flow 2 emit (inside combine) : 2 later
I: Values: 1 later, 2 later, 3 initial, 4 initial
I: Flow 3 emit (inside combine) : 3 later
I: Values: 1 later, 2 later, 3 later, 4 initial
I: Flow 4 emit (inside combine) : 4 later
I: Values: 1 later, 2 later, 3 later, 4 later
1, 2, 3, 4
Neither of those results is what I’d expect, given that the
flow.value
calls are sequential on a single thread.
For what its worth, when comparing to RxJava, it works exactly as expected:
fun main() {
    val processor1 = BehaviorProcessor.createDefault("1 initial")
    val processor2 = BehaviorProcessor.createDefault("2 initial")
    val processor3 = BehaviorProcessor.createDefault("3 initial")
    val processor4 = BehaviorProcessor.createDefault("4 initial")
    processor1
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 1 emit (outside combineLatest)") }
    processor2
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 2 emit (outside combineLatest)") }
    processor3
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 3 emit (outside combineLatest)") }
    processor4
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { System.out.println("Processor 4 emit (outside combineLatest)") }
    Flowable
        .combineLatest(
            processor1.doOnEach { System.out.println("Processor 1 emit (inside combineLatest)") },
            processor2.doOnEach { System.out.println("Processor 2 emit (inside combineLatest)") },
            processor3.doOnEach { System.out.println("Processor 3 emit (inside combineLatest)") },
            processor4.doOnEach { System.out.println("Processor 4 emit (inside combineLatest)") },
            { one, two, three, four ->
                System.out.println("Values: $one, $two, $three, $four")
            }
        )
        .observeOn(<http://Schedulers.io|Schedulers.io>())
        .subscribe { }
    System.out.println("Updated Processor 2")
    processor2.onNext("2 later")
    System.out.println("Updated Processor 4")
    processor4.onNext("4 later")
    System.out.println("Updated Processor 1")
    processor1.onNext("1 later")
    System.out.println("Updated Processor 3")
    processor3.onNext("3 later")
}
That produces the full sequence of expected updates in the correct order:
Processor 1 emit (outside combineLatest)
Processor 2 emit (outside combineLatest)
Processor 3 emit (outside combineLatest)
Processor 4 emit (outside combineLatest)
Processor 1 emit (inside combineLatest)
Processor 2 emit (inside combineLatest)
Processor 3 emit (inside combineLatest)
Processor 4 emit (inside combineLatest)
Values: 1 initial, 2 initial, 3 initial, 4 initial
Updated Processor 2
Processor 2 emit (inside combineLatest)
Processor 2 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 initial
Updated Processor 4
Processor 4 emit (inside combineLatest)
Processor 4 emit (outside combineLatest)
Values: 1 initial, 2 later, 3 initial, 4 later
Updated Processor 1
Processor 1 emit (inside combineLatest)
Processor 1 emit (outside combineLatest)
Values: 1 later, 2 later, 3 initial, 4 later
Updated Processor 3
Processor 3 emit (inside combineLatest)
Processor 3 emit (outside combineLatest)
Values: 1 later, 2 later, 3 later, 4 later
j

Justin

06/25/2021, 3:15 PM
b

Brian Yencho

06/25/2021, 3:17 PM
Excellent, thanks!
lmk if that helped any
b

Brian Yencho

06/25/2021, 3:39 PM
It did for sure. It made me realize we could do this:
val scope = CoroutineScope(Dispatchers.Unconfined)
        val flow1 = MutableStateFlow("1 initial")
        val flow2 = MutableStateFlow("2 initial")
        val flow3 = MutableStateFlow("3 initial")
        val flow4 = MutableStateFlow("4 initial")
        val flow5 = MutableStateFlow("5 initial")
        flow1.onEach { Timber.i("Flow 1 emit (outside combine) : $it") }.launchIn(scope)
        flow2.onEach { Timber.i("Flow 2 emit (outside combine) : $it") }.launchIn(scope)
        flow3.onEach { Timber.i("Flow 3 emit (outside combine) : $it") }.launchIn(scope)
        flow4.onEach { Timber.i("Flow 4 emit (outside combine) : $it") }.launchIn(scope)
        flow5.onEach { Timber.i("Flow 5 emit : $it") }.launchIn(viewModelScope)
        combine(
            flow1.onEach { Timber.i("Flow 1 emit (inside combine) : $it") },
            flow2.onEach { Timber.i("Flow 2 emit (inside combine) : $it") },
            flow3.onEach { Timber.i("Flow 3 emit (inside combine) : $it") },
            flow4.onEach { Timber.i("Flow 4 emit (inside combine) : $it") }
        ) { one, two, three, four ->
            flow5.value = "Values: $one, $two, $three, $four"
        }
            .launchIn(scope)
        Timber.i("Updating flow2")
        flow2.value = "2 later"
        Timber.i("Updating flow4")
        flow4.value = "4 later"
        Timber.i("Updating flow1")
        flow1.value = "1 later"
        Timber.i("Updating flow3")
        flow3.value = "3 later"
That results in
I: Flow 1 emit (outside combine) : 1 initial
I: Flow 2 emit (outside combine) : 2 initial
I: Flow 3 emit (outside combine) : 3 initial
I: Flow 4 emit (outside combine) : 4 initial
I: Flow 5 emit : 5 initial
I: Flow 1 emit (inside combine) : 1 initial
I: Flow 2 emit (inside combine) : 2 initial
I: Flow 3 emit (inside combine) : 3 initial
I: Flow 4 emit (inside combine) : 4 initial
I: Flow 5 emit : Values: 1 initial, 2 initial, 3 initial, 4 initial
I: Updating flow2
I: Flow 2 emit (outside combine) : 2 later
I: Flow 2 emit (inside combine) : 2 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 initial
I: Updating flow4
I: Flow 4 emit (outside combine) : 4 later
I: Flow 4 emit (inside combine) : 4 later
I: Flow 5 emit : Values: 1 initial, 2 later, 3 initial, 4 later
I: Updating flow1
I: Flow 1 emit (outside combine) : 1 later
I: Flow 1 emit (inside combine) : 1 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 initial, 4 later
I: Updating flow3
I: Flow 3 emit (outside combine) : 3 later
I: Flow 3 emit (inside combine) : 3 later
I: Flow 5 emit : Values: 1 later, 2 later, 3 later, 4 later
🙌 1
j

Justin

06/25/2021, 3:39 PM
ooohhhh very nice
b

Brian Yencho

06/25/2021, 3:39 PM
So we’d combine with
Unconfined
to ensure we get values as soon as they are pushed. And then that can push to another Flow that can be collected in
viewModelScope
so it can update the UI etc.
There are probably some details here to work out but I think the rough idea is there.
j

Justin

06/25/2021, 3:44 PM
yeah that’s awesome. Also, when I was looking into this, I came across
Flow.stateIn(coroutineScope)
and
Flow.shareIn(coroutineScope, SharingStarted.Eagerly)
or whichever
SharingStarted
may be appropriate in your use case
b

Brian Yencho

06/25/2021, 3:44 PM
Sure 👍
Thanks for your help
j

Justin

06/25/2021, 3:45 PM
no problem! happy to help
a

Ahaisting

06/25/2021, 3:45 PM
thanks to both of you for the responses…. I was really losing my mind yesterday figuring out what was going on!! I appreciate it.
😄 1
j

Justin

06/25/2021, 3:52 PM
yeah I know how that is lol, happy to help