Leo Delon
05/02/2023, 4:09 PMcombine
combineTransform
zip
and merge
with no success. The first emission occurs correctly. but once only B emits it doesn't get propagated downstream. Any ideas on what I could try?
private val _state = MutableStateFlow<State>(State.Loading)
fun loadData(collectionId: String) {
viewModelScope.launch {
val collectionFlow = repository.getCollection(collectionId)
val itemsFlow = repository.getCollectionItems(collectionId)
combine(collectionFlow, itemsFlow, ::Pair).flatMapConcat { (collection, items) ->
val map = ...// maps the data
modelUseCase(map) // sends to another flow
}.collect{ model ->
if (model.items.isEmpty()) {
_state.value = State.Empty
} else {
_state.value = State.Content(model)
}
}
}
}
kevin.cianfarini
05/02/2023, 4:25 PMfun flowFromId(id: String): Flow<Pair<A, B>> = combine(
flowAFromId(id),
flowBFromId(id),
::Pair,
)
fun flowAFromId(id: String): Flow<A> = flow { /* omitted */ }
fun flowBFromId(id: String): Flow<B> = flow { /* omitted */ }
Leo Delon
05/02/2023, 4:32 PMkevin.cianfarini
05/02/2023, 5:28 PMval collectionFlow = repository.getCollection(collectionId)
val itemsFlow = repository.getCollectionItems(collectionId)
combine(collectionFlow, itemsFlow, ::Pair).collect { (collection, items) ->
error("Got an item!")
}
If that produces an error then you know the combine is working.Leo Delon
05/02/2023, 5:29 PMcombine
operator of Kotlin Flow waits until all flows emit at least one value before it starts emitting combined values. If any of the flows don't emit a value, the combine
operator won't emit any value.
To solve this problem, you can use the onEach
operator to apply a side effect and ensure that each flow emits at least one value. For example:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("A", "B", "C")
flow1
.onEach { /* Applying a side effect */ }
.combine(flow2.onEach { /* Applying a side effect */ }) { i, s -> "$i$s" }
.collect { println(it) }
}
kevin.cianfarini
05/02/2023, 5:29 PMLeo Delon
05/02/2023, 5:30 PMkevin.cianfarini
05/02/2023, 5:30 PM.onStart { emit(item) }
to ensure a flow has an itemLeo Delon
05/02/2023, 5:30 PMkevin.cianfarini
05/02/2023, 5:30 PMLeo Delon
05/02/2023, 5:31 PMkevin.cianfarini
05/02/2023, 5:31 PMLeo Delon
05/02/2023, 5:31 PMkevin.cianfarini
05/02/2023, 5:32 PMval flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2a 2b 2c"
}
Leo Delon
05/02/2023, 5:32 PMfun main() {
val flow1 = MutableSharedFlow<Int>()
val flow2 = MutableSharedFlow<String>()
GlobalScope.launch {
launch {
flow1.combine(flow2) { i: Int, s: String -> i.toString() + s }
.collect {
println(it)
}
}
delay(100)
flow1.emit(1)
flow2.emit("a")
delay(100)
flow1.emit(2)
delay(100)
flow2.emit("b")
}
}
val collectionFlow = repository.getCollection(collectionId).onEach {
Timber.d("loadData ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
Timber.d("loadData ${it.size}")
}
collectionFlow.combine(itemsFlow) { collection, items ->
Timber.d("loadData ${items.size}")
collection to items
}
kevin.cianfarini
05/02/2023, 6:03 PMcollected
.
val collectionFlow = repository.getCollection(collectionId).onEach {
Timber.d("loadData ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
Timber.d("loadData ${it.size}")
}
collectionFlow.combine(itemsFlow) { collection, items ->
Timber.d("loadData ${items.size}")
collection to items
}
Leo Delon
05/02/2023, 6:03 PMkevin.cianfarini
05/02/2023, 6:03 PMLeo Delon
05/02/2023, 6:09 PMD loadData Newer // Flow 1 - Emission 1
D loadData 2 // Flow 2 - Emission 1
D loadData 2 // Combine - Emission 1
D loadData Newest // Flow 1 - Emission 2
kevin.cianfarini
05/02/2023, 6:15 PMval collectionFlow = repository.getCollection(collectionId).onEach {
Timber.d("getCollectionFlow ${it.name}")
}
val itemsFlow = repository.getCollectionItems(collectionId).onEach {
Timber.d("getCollectionItems ${it.size}")
}
collectionFlow.combine(itemsFlow) { collection, items ->
Timber.d("combine ${collection.name}: ${items.size}")
collection to items
}
Leo Delon
05/02/2023, 6:16 PMD getCollectionFlow Newest
D getCollectionItems 2
D combine Newest: 2
D getCollectionFlow Newer
kotlin=1.8.10
kotlinx.coroutines=1.6.4
kevin.cianfarini
05/02/2023, 6:31 PMcollectionFlow.combine(itemsFlow, ::Pair).onCompletion { /* log completion here */ }
onCompletion
function takes an optional throwable as the cause. Would be good to log thatLeo Delon
05/02/2023, 6:38 PMkevin.cianfarini
05/02/2023, 6:51 PM.take(2).toList()
to see what happens?combine
to see where it’s failing:
fun <T, E, R> Flow<T>.myCombine(other: Flow<E>, tansform: (T, E) -> R): Flow<R> {
return combineTransform(flow2) { a, b ->
println("Got values $a and $b")
val value = transform(a, b)
println("emitting $value")
emit(value)
println("emitted $value")
}
}
Leo Delon
05/02/2023, 6:57 PMkevin.cianfarini
05/02/2023, 6:58 PMLeo Delon
05/02/2023, 7:01 PMkevin.cianfarini
05/02/2023, 7:02 PMLeo Delon
05/02/2023, 7:03 PMD getCollectionFlow yoyoyoyt
D getCollectionItems 0
D Got values AssetCollection(id=VX9MXud3QWUsAFwuAJlx, name=yoyoyoyt, itemCount=78, timestamp=Fri Mar 03 10:10:03 GMT+01:00 2023) and []
D combine yoyoyoyt: 0
D emitting (AssetCollection(id=VX9MXud3QWUsAFwuAJlx, name=yoyoyoyt, itemCount=78, timestamp=Fri Mar 03 10:10:03 GMT+01:00 2023), [])
D getCollectionFlow yoyoyoyx
kevin.cianfarini
05/02/2023, 7:13 PMonCompletion
to each of your constituent flows to see if either of them are completing.Leo Delon
05/02/2023, 7:15 PMkevin.cianfarini
05/02/2023, 7:23 PMLeo Delon
05/02/2023, 7:25 PMkevin.cianfarini
05/02/2023, 7:26 PMLeo Delon
05/02/2023, 7:38 PMonCompletion kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled