Thomas
02/25/2025, 11:03 AMFlow
, whats the "idiomatic" way to perform combine
but "suspend" instead of "conflate"?
The existing `combine` function deals with back pressure by simply skipping items, behaving as if a conflated channel.
Whats the best I could do to for it to NOT skip items?Sam
02/25/2025, 11:05 AMcombine
isn't working for you? On its own, I don't think it should do any skipping. As far as I know, a.combine(b) { … }
will simply emit whenever a
or b
emits. If you're seeing it conflate things, I wonder if there's something else causing that in your code?Thomas
02/25/2025, 11:06 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking{
val f1 = MutableSharedFlow<Int>()
val f2 = MutableStateFlow("!")
val j = launch{
combine(f1,f2) { a, b -> a to b }
.collect { (a, b) ->
println("$a$b")
delay(100)
}
}
delay(1000)
repeat(10) {
println("f1: $it")
f1.emit(it)
}
delay(1000)
j.cancel()
}
Outputs:
f1: 0
f1: 1
0!
f1: 2
f1: 3
f1: 4
2!
f1: 5
f1: 6
4!
f1: 7
f1: 8
6!
f1: 9
8!
9!
(playground link)Thomas
02/25/2025, 11:07 AMf1
is emits elements instantaneously, while the collection is delayed by 100 ms.
Some elements that we might expect (e.g. "1!", "3!") are "skipped", so it feels like the data went through a conflated channel/buffer.Thomas
02/25/2025, 11:09 AMSam
02/25/2025, 11:21 AMDmitry Khalanskiy [JB]
02/25/2025, 11:28 AMcombine
is to obtain up-to-date views of several data channels.
There's also the zip
operation that combines each emission in one data channel with the corresponding emission in another data channel.
Does zip
fit your use case, or do you need some elements to be repeated (if one data channel is faster than the other), but no elements to be skipped?Thomas
02/25/2025, 11:28 AMThomas
02/25/2025, 11:30 AMDmitry Khalanskiy [JB]
02/25/2025, 11:31 AMThomas
02/25/2025, 11:36 AMThomas
02/25/2025, 11:43 AMFlow
which emits pairs of ID and numeric value.
... I could resolve this issue by preprocessing the numeric value flow through the use of runningFold
or runningReduce
but I'm avoiding thatDmitry Khalanskiy [JB]
02/25/2025, 11:49 AMcombine
help in this scenario, though?Thomas
02/25/2025, 12:06 PMval displayedDataFlow = combine(idToNumberResultFlow, namesResultFlow){ idToNumResult, namesResult->
/* cache "ID to number" in a map if possible */
ids.map{
/* transform to displayed data ... */
}
}
Dmitry Khalanskiy [JB]
02/25/2025, 12:08 PMidToNumberFlow
and namesFlow
synchronized so that a name matches the corresponding ID?Thomas
02/25/2025, 12:12 PMResult
type, something like Flow<Result<T>>
Dmitry Khalanskiy [JB]
02/25/2025, 12:17 PMcombine
would be useful. I'd implement it like this:
val nameGettingScope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob())
suspend fun obtainName(id: Int): String {
// network call, etc.
}
fun getName(id: Int) = nameGettingScope.async {
cachedNames[id] ?: obtainName(id).also { cachedNames[id] = it }
}
idToNumberFlow.map {
val (id, number) = it
val name = getName(id)
IdNumberAndName(id, number, name)
}
Thomas
02/25/2025, 12:25 PMcombine
because I see that I have two `Flow`s of data and what I'm trying to do is to combine them both, its the most straightforwardDmitry Khalanskiy [JB]
02/25/2025, 12:27 PMFlow
of data comes from and how it relates to the first Flow
, that's all. If anything, zip
seems more useful, as it would at least ensure that the elements are paired correctly.Robert Williams
02/25/2025, 12:29 PMf1 = flowOf(0,1,2,3,4,5,6,7,8,9)
I'd expect it to only collect 9 (or maybe 0 and 9) if it's simply conflating but it actually collects exactly the same values as the SharedFlow version: 0,2,4,6,8,9
😕
And changing collect
to collectLatest
gives the opposite values 0,1,3,5,7,9
😕 😕Dmitry Khalanskiy [JB]
02/25/2025, 12:32 PMcombine
does not introduce any multithreading on its own, so the exact result will depend on when and how often each of the flows suspends.Thomas
02/25/2025, 12:33 PMidToNumberResultFlow
is obtained through some function like fun getIdToNumber(param: Param): Flow<Result<Pair<Id, Number>>>
whereas namesResultFlow
is obtained through some other function like fun getNames(ids: List<Id>): Flow<Result<Map<Id, String>>>
Something like thatThomas
02/25/2025, 12:53 PMcombine
is designed this way to begin with 🤔
If we have combineThatDoesntSkipCombinations
, we can simulate combine
easily through `conflate`:
combineThatDoesntSkipCombinations(flow1, flow2){ ... }
can be turned into combine(flow1, flow2){ ... }
, by calling it like
combineThatDoesntSkipCombinations(flow1.conflate(), flow2.conflate()){ ... }
But we cannot do the opposite, we cannot simulate combineThatDoesntSkipCombinations
easily through combine
Robert Williams
02/25/2025, 1:34 PMI get that the answer will depend on a lot of things but I’d like to have some intuition on how it decides what to skip otherwise we essentially have to treat the whole thing as undefined behaviourdoes not introduce any multithreading on its own, so the exact result will depend on when and how often each of the flows suspends.combine
Dmitry Khalanskiy [JB]
02/25/2025, 1:50 PMcombine
tomorrow, leading to a different set of places where suspensions occur, without considering it a breaking change.Thomas
02/26/2025, 2:18 AMcombine
is used is because the name API and number API are requested in parallel and either one could return first. We want either one of them to trigger the update of displayed data and combine
does this very wellDmitry Khalanskiy [JB]
02/26/2025, 6:34 AMmerge
is the operator you're looking for. Example: https://pl.kotl.in/UzBcQpqzSThomas
02/26/2025, 8:27 AMmerge
solution is nice as well, but it requires transformation to be done upstreamThomas
02/26/2025, 8:29 AMmerge
does not skip emissions to implement combineThatDoesntSkipCombinations
, here renamed to `combineWithoutConflation`:
private sealed interface Box
@JvmInline
private value class Box1<T>(val value: T) : Box
@JvmInline
private value class Box2<T>(val value: T) : Box
@Suppress("UNCHECKED_CAST")
fun <T1, T2, R> combineWithoutConflation(
flow1: Flow<T1>,
flow2: Flow<T2>,
transform: suspend (T1, T2) -> R,
): Flow<R> = flow {
var b1: Box1<T1>? = null
var b2: Box2<T2>? = null
merge(flow1.map { Box1(it) }, flow2.map { Box2(it) }).collect {
when (it) {
is Box1<*> -> {
b1 = it as Box1<T1>
b2?.also {
emit(transform(b1!!.value, it.value))
}
}
is Box2<*> -> {
b2 = it as Box2<T2>
b1?.also {
emit(transform(it.value, b2!!.value))
}
}
}
}
}
Thomas
02/26/2025, 8:30 AMThomas
02/26/2025, 8:30 AMDmitry Khalanskiy [JB]
02/26/2025, 9:22 AMThomas
02/26/2025, 11:02 AMThomas
02/26/2025, 11:02 AMimport kotlin.time.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
//--------------------------------------------------------------------------------------------------------------------//
typealias Id = String
typealias Number = Int
typealias Name = String
sealed interface ApiResult<out T> {
data object Loading : ApiResult<Nothing>
class Error(val exception: Exception) : ApiResult<Nothing>
class Success<T>(val data: T) : ApiResult<T>
}
val <T> ApiResult<T>.successData: T? get() = (this as? ApiResult.Success)?.data
class DisplayedData(val id: Id, var name: Name, var number: Number?)
//--------------------------------------------------------------------------------------------------------------------//
//performs API request etc.
fun getNames(ids: List<Id>): Flow<ApiResult<Map<Id, Name>>> = flow {
emit(ApiResult.Loading)
delay(500)
emit(
ApiResult.Success(
mapOf(
"001" to "Name1",
"002" to "Name2",
"003" to "Name3"
)
)
)
}
//performs API request etc.
fun getNumbers(ids: List<Id>): Flow<ApiResult<Pair<Id, Number>>> = flow {
emit(ApiResult.Loading)
delay(500)
repeat(5) {
emit(ApiResult.Success("002" to 200 + it))
delay(100)
emit(ApiResult.Success("001" to 100 + it))
delay(100)
emit(ApiResult.Success("003" to 300 + it))
}
}
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
/* some more preprocessing here etc */
val namesResultFlow = getNames(ids)
val numbersResultFlow = getNumbers(ids)
val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()
combine(namesResultFlow, numbersResultFlow) { nameResult, numberResult ->
numberResult.successData?.also { (id, number) ->
cachedNumberMap[id] = number
}
when (nameResult) {
ApiResult.Loading -> emit(ApiResult.Loading)
is ApiResult.Error -> emit(ApiResult.Error(Exception("...")))
is ApiResult.Success<Map<Id, Name>> -> emit(ApiResult.Success(
//if an ID is not returned by name API, it is not shown in the displayed data list
ids.mapNotNull { id ->
DisplayedData(
id = id,
name = nameResult.data[id] ?: return@mapNotNull null,
number = cachedNumberMap[id]
)
}
))
}
}.collect()
}.distinctUntilChanged()
//--------------------------------------------------------------------------------------------------------------------//
suspend fun main() {
getDisplayedData(listOf("001", "002", "003", "004")).collect {
when (it) {
is ApiResult.Error ->
println("ERROR")
ApiResult.Loading ->
println("LOADING")
is ApiResult.Success<List<DisplayedData>> ->
println(it.data.joinToString(separator = "\n", prefix = "SUCCESS:\n") { with(it) { " $id($name) - $number" } })
}
}
}
Thomas
02/26/2025, 11:02 AMLOADING
SUCCESS:
001(Name1) - null
002(Name2) - null
003(Name3) - null
SUCCESS:
001(Name1) - null
002(Name2) - 200
003(Name3) - null
SUCCESS:
001(Name1) - 100
002(Name2) - 200
003(Name3) - null
SUCCESS:
001(Name1) - 100
002(Name2) - 200
003(Name3) - 300
SUCCESS:
001(Name1) - 100
002(Name2) - 201
003(Name3) - 300
SUCCESS:
001(Name1) - 101
002(Name2) - 201
003(Name3) - 300
SUCCESS:
001(Name1) - 101
002(Name2) - 201
003(Name3) - 301
SUCCESS:
001(Name1) - 101
002(Name2) - 202
003(Name3) - 301
SUCCESS:
001(Name1) - 102
002(Name2) - 202
003(Name3) - 301
SUCCESS:
001(Name1) - 102
002(Name2) - 202
003(Name3) - 302
SUCCESS:
001(Name1) - 102
002(Name2) - 203
003(Name3) - 302
SUCCESS:
001(Name1) - 103
002(Name2) - 203
003(Name3) - 302
SUCCESS:
001(Name1) - 103
002(Name2) - 203
003(Name3) - 303
SUCCESS:
001(Name1) - 103
002(Name2) - 204
003(Name3) - 303
SUCCESS:
001(Name1) - 104
002(Name2) - 204
003(Name3) - 303
SUCCESS:
001(Name1) - 104
002(Name2) - 204
003(Name3) - 304
Dmitry Khalanskiy [JB]
02/26/2025, 11:44 AMcombine
is appealing in this case.
Here's how you can make it work:
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
/* some more preprocessing here etc */
val namesResultFlow = getNames(ids)
val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()
val numbersResultFlow = getNumbers(ids).onEach { numberResult ->
numberResult.successData?.also { (id, number) ->
cachedNumberMap[id] = number
}
}
combine(namesResultFlow, numbersResultFlow) { nameResult, _ ->
when (nameResult) {
ApiResult.Loading -> emit(ApiResult.Loading)
is ApiResult.Error -> emit(ApiResult.Error(Exception("...")))
is ApiResult.Success<Map<Id, Name>> -> emit(ApiResult.Success(
//if an ID is not returned by name API, it is not shown in the displayed data list
ids.mapNotNull { id ->
DisplayedData(
id = id,
name = nameResult.data[id] ?: return@mapNotNull null,
number = cachedNumberMap[id]
)
}
))
}
}.collect()
}.distinctUntilChanged()
Sematically, updating a MutableMap
is a side effect, and those should be performed in onEach
. Even though the elements in numbersResultFlow
may be lost, the map-updating side effect will still be performed.
Here's the same code, but with some small simplifications here and there:
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> = flow {
/* some more preprocessing here etc */
val namesResultFlow = getNames(ids)
val cachedNumberMap: MutableMap<Id, Number> = mutableMapOf()
val numbersResultFlow = getNumbers(ids)
.mapNotNull { it.successData }
.onEach { (id, number) -> cachedNumberMap[id] = number }
combine(namesResultFlow, numbersResultFlow) { nameResult, _ ->
when (nameResult) {
ApiResult.Loading -> ApiResult.Loading
is ApiResult.Error -> ApiResult.Error(Exception("..."))
is ApiResult.Success<Map<Id, Name>> -> ApiResult.Success(
//if an ID is not returned by name API, it is not shown in the displayed data list
ids.mapNotNull { id ->
DisplayedData(
id = id,
name = nameResult.data[id] ?: return@mapNotNull null,
number = cachedNumberMap[id]
)
}
)
}
}.collect(this)
}.distinctUntilChanged()
Dmitry Khalanskiy [JB]
02/26/2025, 11:50 AMcombine
is beneficial:
fun getDisplayedData(ids: List<Id>): Flow<ApiResult<List<DisplayedData>>> {
/* some more preprocessing here etc */
val namesResultFlow = getNames(ids)
val numbersResultFlow = getNumbers(ids)
.mapNotNull { it.successData }
.scan(emptyMap<Id, Number>()) { acc, (id, number) -> acc + (id to number) }
return combine(namesResultFlow, numbersResultFlow) { nameResult, numberMap ->
when (nameResult) {
ApiResult.Loading -> ApiResult.Loading
is ApiResult.Error -> ApiResult.Error(Exception("..."))
is ApiResult.Success<Map<Id, Name>> -> ApiResult.Success(
//if an ID is not returned by name API, it is not shown in the displayed data list
ids.mapNotNull { id ->
DisplayedData(
id = id,
name = nameResult.data[id] ?: return@mapNotNull null,
number = numberMap[id]
)
}
)
}
}.distinctUntilChanged()
}
I'd recommend using kotlinx.collections.immutable
and use a persistent map when going down this road, though, as the immutable map provided out of the box doesn't have the best performance characteristics in this scenario.Thomas
02/26/2025, 12:02 PMscan
/ runningFold
/ runningReduce
seem to be the cleanest way of dealing with it.
Thanks for the advice!