For `Flow`, whats the "idiomatic" way to perform `...
# coroutines
t
For
Flow
, whats the "idiomatic" way to perform
combine
but "suspend" instead of "conflate"? The existing `combine` function deals with back pressure by simply skipping items, behaving as if a conflated channel. Whats the best I could do to for it to NOT skip items?
s
Can you say more about why
combine
isn't working for you? On its own, I don't think it should do any skipping. As far as I know,
a.combine(b) { … }
will simply emit whenever
a
or
b
emits. If you're seeing it conflate things, I wonder if there's something else causing that in your code?
t
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking{
    val f1 = MutableSharedFlow<Int>()
    val f2 = MutableStateFlow("!")
    
    val j = launch{
        combine(f1,f2) { a, b -> a to b }
            .collect { (a, b) ->
                println("$a$b")
                delay(100)
            }
    }

    delay(1000)
    repeat(10) {
        println("f1: $it")
        f1.emit(it)
    }
    
    delay(1000)
    j.cancel()
}
Outputs:
Copy code
f1: 0
f1: 1
0!
f1: 2
f1: 3
f1: 4
2!
f1: 5
f1: 6
4!
f1: 7
f1: 8
6!
f1: 9
8!
9!
(playground link)
In this example,
f1
is emits elements instantaneously, while the collection is delayed by 100 ms. Some elements that we might expect (e.g. "1!", "3!") are "skipped", so it feels like the data went through a conflated channel/buffer.
In the
combine
documentation theres a line:
Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.
So I suppose this "conflation" behaviour is implied... 🤔 Had to admit I got tripped over by this though 🥴
s
Interesting, I think I agree, the lack of backpressure feels a bit unintuitive in that scenario. The docs conveniently avoid the problem by using an example where the upstream is slower than the collector 🤨
d
We can clarify the docs, but yes, the reason there is no backpressure is that the use case for
combine
is to obtain up-to-date views of several data channels. There's also the
zip
operation that combines each emission in one data channel with the corresponding emission in another data channel. Does
zip
fit your use case, or do you need some elements to be repeated (if one data channel is faster than the other), but no elements to be skipped?
t
zip does not fit my case unfortunately
Yes exactly! Some elements repeated, while having no combination of elements skipped over
d
Interesting! Could you please explain the business logic behind this requirement?
t
I work in Android, in my project we have a screen that displays a list of items. Each item consists of an ID, a name and a numeric value. The numeric values updates constantly, and is provided by a streaming API through TCP. The name values only had to be obtained once through another HTTP API.
The numeric value API is wrapped into a
Flow
which emits pairs of ID and numeric value. ... I could resolve this issue by preprocessing the numeric value flow through the use of
runningFold
or
runningReduce
but I'm avoiding that
d
Thanks! How does
combine
help in this scenario, though?
t
Well it looks something like this in the view model
Copy code
val displayedDataFlow = combine(idToNumberResultFlow, namesResultFlow){ idToNumResult, namesResult->
    /* cache "ID to number" in a map if possible */
    ids.map{
        /* transform to displayed data ... */
    }
}
d
Are names just completely unrelated to the ID here? If they are related, then how are
idToNumberFlow
and
namesFlow
synchronized so that a name matches the corresponding ID?
t
Ah sorry I edited my snippet now for clarity We could obtain a name with a ID, and if it is not found we will display a place holder instead. The name and number data flows are wrapped with a custom
Result
type, something like
Flow<Result<T>>
d
If I understood your use case correctly, it's unclear why
combine
would be useful. I'd implement it like this:
Copy code
val nameGettingScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob())

suspend fun obtainName(id: Int): String {
    // network call, etc.
}

fun getName(id: Int) = nameGettingScope.async {
    cachedNames[id] ?: obtainName(id).also { cachedNames[id] = it }
}

idToNumberFlow.map {
    val (id, number) = it
    val name = getName(id)
    IdNumberAndName(id, number, name)
}
👍 1
t
That's nice and clean! Well I opted to use
combine
because I see that I have two `Flow`s of data and what I'm trying to do is to combine them both, its the most straightforward
d
What I don't get is where the second
Flow
of data comes from and how it relates to the first
Flow
, that's all. If anything,
zip
seems more useful, as it would at least ensure that the elements are paired correctly.
r
I'm now struggling to fully understand the exact behaviour of combine because I feel like "most recently emitted values" doesn't tell the full story. If I change the example to just have
f1 = flowOf(0,1,2,3,4,5,6,7,8,9)
I'd expect it to only collect 9 (or maybe 0 and 9) if it's simply conflating but it actually collects exactly the same values as the SharedFlow version:
0,2,4,6,8,9
😕 And changing
collect
to
collectLatest
gives the opposite values
0,1,3,5,7,9
😕 😕
plus1 1
d
combine
does not introduce any multithreading on its own, so the exact result will depend on when and how often each of the flows suspends.
t
Its mostly due to the way the APIs are designed, and the way those APIs are wrapped previously in other parts of the project. 🤔 For example,
idToNumberResultFlow
is obtained through some function like
fun getIdToNumber(param: Param): Flow<Result<Pair<Id, Number>>>
whereas
namesResultFlow
is obtained through some other function like
fun getNames(ids: List<Id>): Flow<Result<Map<Id, String>>>
Something like that
What I don't quite understand is why
combine
is designed this way to begin with 🤔 If we have
combineThatDoesntSkipCombinations
, we can simulate
combine
easily through `conflate`:
combineThatDoesntSkipCombinations(flow1, flow2){ ... }
can be turned into
combine(flow1, flow2){ ... }
, by calling it like
combineThatDoesntSkipCombinations(flow1.conflate(), flow2.conflate()){ ... }
But we cannot do the opposite, we cannot simulate
combineThatDoesntSkipCombinations
easily through
combine
r
combine
does not introduce any multithreading on its own, so the exact result will depend on when and how often each of the flows suspends.
I get that the answer will depend on a lot of things but I’d like to have some intuition on how it decides what to skip otherwise we essentially have to treat the whole thing as undefined behaviour
d
Yes, it's usually not a good idea to rely on when exactly a suspension will happen, it's better to treat it as an implementation detail. I wouldn't call it "undefined," though, but rather "unspecified." We could change the implementation of
combine
tomorrow, leading to a different set of places where suspensions occur, without considering it a breaking change.
t
@Dmitry Khalanskiy [JB] another reason why
combine
is used is because the name API and number API are requested in parallel and either one could return first. We want either one of them to trigger the update of displayed data and
combine
does this very well
d
In that case, the name update flow should also know the id, because otherwise, it's impossible to know whose name it is. Then,
merge
is the operator you're looking for. Example: https://pl.kotl.in/UzBcQpqzS
👀 1
t
The
merge
solution is nice as well, but it requires transformation to be done upstream
It gave me an idea, we could take advantage of the fact that
merge
does not skip emissions to implement
combineThatDoesntSkipCombinations
, here renamed to `combineWithoutConflation`:
Copy code
private sealed interface Box
@JvmInline
private value class Box1<T>(val value: T) : Box
@JvmInline
private value class Box2<T>(val value: T) : Box

@Suppress("UNCHECKED_CAST")
fun <T1, T2, R> combineWithoutConflation(
    flow1: Flow<T1>,
    flow2: Flow<T2>,
    transform: suspend (T1, T2) -> R,
): Flow<R> = flow {
    var b1: Box1<T1>? = null
    var b2: Box2<T2>? = null
    merge(flow1.map { Box1(it) }, flow2.map { Box2(it) }).collect {
        when (it) {
            is Box1<*> -> {
                b1 = it as Box1<T1>
                b2?.also {
                    emit(transform(b1!!.value, it.value))
                }
            }

            is Box2<*> -> {
                b2 = it as Box2<T2>
                b1?.also {
                    emit(transform(it.value, b2!!.value))
                }
            }
        }
    }
}
what do you think?
d
I still don't understand the usage scenarios for this function, sorry. Here, you seem to be combining elements that are not necessarily logically related (that is, share the same id), if I understood your use case correctly. If you can provide a complete usage example for this function, it would help illustrate your case. Also, I don't understand what you mean by "transformations upstream" and why they are understandable.
t
It looks something like this https://pl.kotl.in/JGD6n0bwF
Copy code
import kotlin.time.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

//--------------------------------------------------------------------------------------------------------------------//
typealias Id = String
typealias Number = Int
typealias Name = String

sealed interface ApiResult<out T> {
    data object Loading : ApiResult<Nothing>
    class Error(val exception: Exception) : ApiResult<Nothing>
    class Success<T>(val data: T) : ApiResult<T>
}

val <T> ApiResult<T>.successData: T? get() = (this as? ApiResult.Success)?.data

class DisplayedData(val id: Id, var name: Name, var number: Number?)

//--------------------------------------------------------------------------------------------------------------------//
//performs API request etc.
fun getNames(ids: List<Id>): Flow<ApiResult<Map<Id, Name>>> = flow {
    emit(ApiResult.Loading)
    delay(500)
    emit(
        ApiResult.Success(
            mapOf(
                "001" to "Name1",
                "002" to "Name2",
                "003" to "Name3"
            )
        )
    )
}

//performs API request etc.
fun getNumbers(ids: List<Id>): Flow<ApiResult<Pair<Id, Number>>> = flow {
    emit(ApiResult.Loading)
    delay(500)
    repeat(5) {
        emit(ApiResult.Success("002" to 200 + it))
        delay(100)
        emit(ApiResult.Success("001" to 100 + it))
        delay(100)
        emit(ApiResult.Success("003" to 300 + it))
    }
}

fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
    /* some more preprocessing here etc */
    val namesResultFlow = getNames(ids)
    val numbersResultFlow = getNumbers(ids)

    val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()

    combine(namesResultFlow, numbersResultFlow) { nameResult, numberResult ->
        numberResult.successData?.also { (id, number) ->
            cachedNumberMap[id] = number
        }

        when (nameResult) {
            ApiResult.Loading -> emit(ApiResult.Loading)

            is ApiResult.Error -> emit(ApiResult.Error(Exception("...")))

            is ApiResult.Success<Map<Id, Name>> -> emit(ApiResult.Success(
                //if an ID is not returned by name API, it is not shown in the displayed data list
                ids.mapNotNull { id ->
                    DisplayedData(
                        id = id,
                        name = nameResult.data[id] ?: return@mapNotNull null,
                        number = cachedNumberMap[id]
                    )
                }
            ))
        }
    }.collect()
}.distinctUntilChanged()

//--------------------------------------------------------------------------------------------------------------------//
suspend fun main() {
    getDisplayedData(listOf("001", "002", "003", "004")).collect {
        when (it) {
            is ApiResult.Error ->
                println("ERROR")

            ApiResult.Loading ->
                println("LOADING")

            is ApiResult.Success<List<DisplayedData>> ->
                println(it.data.joinToString(separator = "\n", prefix = "SUCCESS:\n") { with(it) { "  $id($name) - $number" } })
        }
    }
}
Output:
Copy code
LOADING
SUCCESS:
  001(Name1) - null
  002(Name2) - null
  003(Name3) - null
SUCCESS:
  001(Name1) - null
  002(Name2) - 200
  003(Name3) - null
SUCCESS:
  001(Name1) - 100
  002(Name2) - 200
  003(Name3) - null
SUCCESS:
  001(Name1) - 100
  002(Name2) - 200
  003(Name3) - 300
SUCCESS:
  001(Name1) - 100
  002(Name2) - 201
  003(Name3) - 300
SUCCESS:
  001(Name1) - 101
  002(Name2) - 201
  003(Name3) - 300
SUCCESS:
  001(Name1) - 101
  002(Name2) - 201
  003(Name3) - 301
SUCCESS:
  001(Name1) - 101
  002(Name2) - 202
  003(Name3) - 301
SUCCESS:
  001(Name1) - 102
  002(Name2) - 202
  003(Name3) - 301
SUCCESS:
  001(Name1) - 102
  002(Name2) - 202
  003(Name3) - 302
SUCCESS:
  001(Name1) - 102
  002(Name2) - 203
  003(Name3) - 302
SUCCESS:
  001(Name1) - 103
  002(Name2) - 203
  003(Name3) - 302
SUCCESS:
  001(Name1) - 103
  002(Name2) - 203
  003(Name3) - 303
SUCCESS:
  001(Name1) - 103
  002(Name2) - 204
  003(Name3) - 303
SUCCESS:
  001(Name1) - 104
  002(Name2) - 204
  003(Name3) - 303
SUCCESS:
  001(Name1) - 104
  002(Name2) - 204
  003(Name3) - 304
d
This example demonstrates the issue very well, thanks! I see why
combine
is appealing in this case. Here's how you can make it work:
Copy code
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
    /* some more preprocessing here etc */
    val namesResultFlow = getNames(ids)

    val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()
    val numbersResultFlow = getNumbers(ids).onEach { numberResult ->
        numberResult.successData?.also { (id, number) ->
            cachedNumberMap[id] = number
        }
    }

    combine(namesResultFlow, numbersResultFlow) { nameResult, _ ->
        when (nameResult) {
            ApiResult.Loading -> emit(ApiResult.Loading)

            is ApiResult.Error -> emit(ApiResult.Error(Exception("...")))

            is ApiResult.Success<Map<Id, Name>> -> emit(ApiResult.Success(
                //if an ID is not returned by name API, it is not shown in the displayed data list
                ids.mapNotNull { id ->
                    DisplayedData(
                        id = id,
                        name = nameResult.data[id] ?: return@mapNotNull null,
                        number = cachedNumberMap[id]
                    )
                }
            ))
        }
    }.collect()
}.distinctUntilChanged()
Sematically, updating a
MutableMap
is a side effect, and those should be performed in
onEach
. Even though the elements in
numbersResultFlow
may be lost, the map-updating side effect will still be performed. Here's the same code, but with some small simplifications here and there:
Copy code
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
    /* some more preprocessing here etc */
    val namesResultFlow = getNames(ids)

    val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()
    val numbersResultFlow = getNumbers(ids)
        .mapNotNull { it.successData }
        .onEach { (id, number) -> cachedNumberMap[id] = number }

    combine(namesResultFlow, numbersResultFlow) { nameResult, _ ->
        when (nameResult) {
            ApiResult.Loading -> ApiResult.Loading

            is ApiResult.Error -> ApiResult.Error(Exception("..."))

            is ApiResult.Success<Map<Id, Name>> -> ApiResult.Success(
                //if an ID is not returned by name API, it is not shown in the displayed data list
                ids.mapNotNull { id ->
                    DisplayedData(
                        id = id,
                        name = nameResult.data[id] ?: return@mapNotNull null,
                        number = cachedNumberMap[id]
                    )
                }
            )
        }
    }.collect(this)
}.distinctUntilChanged()
Another option is to go full functional-programming-style and avoid mutable state altogether. Then the "obtain the latest view" functionality of
combine
is beneficial:
Copy code
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> {
    /* some more preprocessing here etc */
    val namesResultFlow = getNames(ids)
    val numbersResultFlow = getNumbers(ids)
        .mapNotNull { it.successData }
        .scan(emptyMap<Id, Number>()) { acc, (id, number) -> acc + (id to number) }

    return combine(namesResultFlow, numbersResultFlow) { nameResult, numberMap ->
        when (nameResult) {
            ApiResult.Loading -> ApiResult.Loading

            is ApiResult.Error -> ApiResult.Error(Exception("..."))

            is ApiResult.Success<Map<Id, Name>> -> ApiResult.Success(
                //if an ID is not returned by name API, it is not shown in the displayed data list
                ids.mapNotNull { id ->
                    DisplayedData(
                        id = id,
                        name = nameResult.data[id] ?: return@mapNotNull null,
                        number = numberMap[id]
                    )
                }
            )
        }
    }.distinctUntilChanged()
}
I'd recommend using
kotlinx.collections.immutable
and use a persistent map when going down this road, though, as the immutable map provided out of the box doesn't have the best performance characteristics in this scenario.
👍 1
t
Yeah, in retrospect
scan
/
runningFold
/
runningReduce
seem to be the cleanest way of dealing with it. Thanks for the advice!