Hey yall. I'm having a combining flow issue. I hav...
# coroutines
l
Hey yall. I'm having a combining flow issue. I have two flows of different types that depend on an id. I need to combine them in such a way that if flow B receives an emission, the entire flow needs to emit again the current state. I tried
combine
combineTransform
zip
and
merge
with no success. The first emission occurs correctly. but once only B emits it doesn't get propagated downstream. Any ideas on what I could try?
Copy code
private val _state = MutableStateFlow<State>(State.Loading)

fun loadData(collectionId: String) {
    viewModelScope.launch {
        val collectionFlow = repository.getCollection(collectionId)
        val itemsFlow = repository.getCollectionItems(collectionId)
        
        combine(collectionFlow, itemsFlow, ::Pair).flatMapConcat { (collection, items) ->
            val map = ...// maps the data
            modelUseCase(map) // sends to another flow
         }.collect{ model ->
            if (model.items.isEmpty()) {
                _state.value = State.Empty
            } else {
                _state.value = State.Content(model)
            }
        }
    }    
}
k
Copy code
fun flowFromId(id: String): Flow<Pair<A, B>> = combine(
  flowAFromId(id),
  flowBFromId(id),
  ::Pair,
)

fun flowAFromId(id: String): Flow<A> = flow { /* omitted */ }
fun flowBFromId(id: String): Flow<B> = flow { /* omitted */ }
Without seeing your code it’s impossible to know what’s going wrong, but the above sample should work to emit on every constituent flow emission
l
Will try
I tried but the emission is not being propagated 😞
@kevin.cianfarini code above
collectionFlow emits but combine doesn't receive anything
k
To debug you should simplify the above code to see if anything is happening at all. You’ve got quite a bit of moving parts in there. I’d suggest:
Copy code
val collectionFlow = repository.getCollection(collectionId)
        val itemsFlow = repository.getCollectionItems(collectionId)
        
        combine(collectionFlow, itemsFlow, ::Pair).collect { (collection, items) ->
          error("Got an item!")
        }
If that produces an error then you know the combine is working.
I am suspicious of your flatmapConcat there being the bug
I doubt that it’s combine that isn’t emitting.
l
I added onEach for each flow. Both are emitting fine
GPT says
The
combine
operator of Kotlin Flow waits until all flows emit at least one value before it starts emitting combined values. If any of the flows don't emit a value, the
combine
operator won't emit any value. To solve this problem, you can use the
onEach
operator to apply a side effect and ensure that each flow emits at least one value. For example:
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf("A", "B", "C")

    flow1
        .onEach { /* Applying a side effect */ }
        .combine(flow2.onEach { /* Applying a side effect */ }) { i, s -> "$i$s" }
        .collect { println(it) }
}
k
Side effects have nothing to do with how flows work in combine. Do not trust that ChatGPT is correct. Consult the documentation.
l
I have onEach after combine
k
If either of your flows are not emitting an item you can wrap them with something like
.onStart { emit(item) }
to ensure a flow has an item
l
nothing comes through
even though one of the flows emitted
k
Both of the flows need to emit for combine to emit
l
the problem occurs on the second emission
k
if only one is emitting then there’s your problem
l
first time they both load and all works
when one changes
the combine doesn't work
k
You need to provide a better reproducer than what you’ve given. Give me something along the lines of the following:
Copy code
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
    println(it) // Will print "1a 2a 2b 2c"
}
l
ok
Copy code
fun main() {
    val flow1 = MutableSharedFlow<Int>()
    val flow2 = MutableSharedFlow<String>()

    
    GlobalScope.launch {
        launch {
           flow1.combine(flow2) { i: Int, s: String -> i.toString() + s }
                .collect {
               		println(it)
           		} 
        }
        
        delay(100)
        flow1.emit(1)
        flow2.emit("a")
        delay(100)
        flow1.emit(2)
        delay(100)
        flow2.emit("b")
        
    }
}
this works ☝️
Copy code
val collectionFlow = repository.getCollection(collectionId).onEach {
    Timber.d("loadData ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
    Timber.d("loadData ${it.size}")
}

collectionFlow.combine(itemsFlow) { collection, items ->
    Timber.d("loadData ${items.size}")
    collection to items
}
this doesn't work after the first emission
even though the individual flows emit
k
Do you collect this at all? Right now it looks like it’s not being
collected
.
Copy code
val collectionFlow = repository.getCollection(collectionId).onEach {
    Timber.d("loadData ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
    Timber.d("loadData ${it.size}")
}

collectionFlow.combine(itemsFlow) { collection, items ->
    Timber.d("loadData ${items.size}")
    collection to items
}
l
I do collect it at the end
first collection works
k
What does your log look like
l
Copy code
D  loadData Newer // Flow 1 - Emission 1
D  loadData 2 // Flow 2 - Emission 1
D  loadData 2 // Combine - Emission 1
D  loadData Newest // Flow 1 - Emission 2
I'm collecting the flow inside of a function in my viewmodel
k
Can you change your debug logs just for the sake of clarity to be:
Copy code
val collectionFlow = repository.getCollection(collectionId).onEach {
    Timber.d("getCollectionFlow ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
    Timber.d("getCollectionItems ${it.size}")
}

collectionFlow.combine(itemsFlow) { collection, items ->
    Timber.d("combine ${collection.name}: ${items.size}")
    collection to items
}
and then post the logs
l
I'm trying returning my flow
instead of collecting
Copy code
D  getCollectionFlow Newest
D  getCollectionItems 2
D  combine Newest: 2
D  getCollectionFlow Newer
Copy code
kotlin=1.8.10
kotlinx.coroutines=1.6.4
k
Hmmmmm
Try and see if the flow is completing:
Copy code
collectionFlow.combine(itemsFlow, ::Pair).onCompletion { /* log completion here */ }
I have one more idea after that but let’s see that result first
That
onCompletion
function takes an optional throwable as the cause. Would be good to log that
l
tried
not completing. Meaning no log
Trying coroutines 1.7.0-Beta with compatibility for kotlin 1.8.10
also no change
k
can you
.take(2).toList()
to see what happens?
Otherwise you should inline the body of
combine
to see where it’s failing:
Copy code
fun <T, E, R> Flow<T>.myCombine(other: Flow<E>, tansform: (T, E) -> R): Flow<R> {
  return combineTransform(flow2) { a, b -> 
    println("Got values $a and $b")
    val value = transform(a, b)
    println("emitting $value")
    emit(value)
    println("emitted $value") 
  }
}
l
how could I take after the first emission (where the problem happens?
k
Can you clarify?
l
I don't know how taking 2 would work to catch the issue
k
I was curious to see if it would suspend forever or throw some sort of exception
l
Copy code
D  getCollectionFlow yoyoyoyt
D  getCollectionItems 0
D  Got values AssetCollection(id=VX9MXud3QWUsAFwuAJlx, name=yoyoyoyt, itemCount=78, timestamp=Fri Mar 03 10:10:03 GMT+01:00 2023) and []
D  combine yoyoyoyt: 0
D  emitting (AssetCollection(id=VX9MXud3QWUsAFwuAJlx, name=yoyoyoyt, itemCount=78, timestamp=Fri Mar 03 10:10:03 GMT+01:00 2023), [])
D  getCollectionFlow yoyoyoyx
the kotlin sample works so is definitely something with my flows
k
You should add
onCompletion
to each of your constituent flows to see if either of them are completing.
l
I did
none is
ok
think I found it
k
What went wrong?
l
it's probably the flatmapConcat
that other flow is also combining
something wrong there could throw the whole thing off
combining without it is collecting
k
Ah yep that'll do it
l
Copy code
onCompletion kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled