Anshulupadhyay03
02/15/2022, 3:18 PMsuspend fun flowsWithCombine() {
val myFlow = flowOf("some value")
val lettersFlow = flowOf("A", "D1", "C", "E", "F", "G", "H").onEach { delay(500) }
myFlow.map { value->
val newValue = lettersFlow.first()
println("before $newValue")
if(newValue == "D1") {
return@map "D1"
}else {
delay(1000)
val newValue = lettersFlow.first() // here i still get A only though i expect newValue as D1
println("after $newValue")
if(newValue == "D1") return@map "D1" else return@map value
}
}.collect {
println(it) // This should print D1
}
}
Sam
02/15/2022, 3:19 PM.first()
on lettersFlow, you're collecting the flow again from the startSam
02/15/2022, 3:21 PMcombine
operator? https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.htmlAnshulupadhyay03
02/15/2022, 3:24 PMSam
02/15/2022, 3:28 PMtakeWhile
or transformWhile
to make lettersFlow
stop once it has emitted D1
Anshulupadhyay03
02/15/2022, 3:56 PMsuspend fun flowsWithCombine1() {
var isDone = false
val myFlow = flowOf("value").onEach { delay(1000) }
val lettersFlow = flowOf("A", "B", "C", "D1", "E", "F", "G", "H").onEach { delay(100) }
myFlow.combine(lettersFlow){value , letter ->
println("one = $value and two = $letter")
if ("D1" == letter){
isDone = true
return@combine letter
}else {
return@combine value
}
}.takeWhile{
isDone
}.collect {
println(it)
}
}
Sam
02/15/2022, 3:59 PMsuspend fun flowsWithCombine1() {
val myFlow = flowOf("value").onEach { delay(1000) }
val lettersFlow = flowOf("A", "B", "C", "D1", "E", "F", "G", "H")
.onEach { delay(100) }
.transformWhile { emit(it); it != "D1" }
myFlow.combine(lettersFlow) {value , letter ->
println("one = $value and two = $letter")
}.collect()
}
Anshulupadhyay03
02/15/2022, 4:08 PMSam
02/15/2022, 4:15 PMmyFlow
to emit a value, and then you wait for letters
to emit D1
, but with a timeout.Sam
02/15/2022, 4:22 PMSam
02/15/2022, 4:22 PMlettersFlow
while you're waiting for myFlow
Anshulupadhyay03
02/15/2022, 6:35 PMAnshulupadhyay03
02/15/2022, 6:51 PMNick Allen
02/17/2022, 1:56 AMStateFlow
only cares about the latest. If you want to wait for the actual event though, don't model it as state. SharedFlow
works for events.
Do you actually want to delay and check again? If "D1" arrives, but then "C" comes before you resume, do you want to just return "some value" or do you want to not miss that "D1"?
Do you want to wait for an event, then wait for a delay, and then wait for an event? Or do you want to wait up to 1 sec for a particular event?
I suspect you actually want something more like:
suspend fun flowsWithCombine() = coroutineScope {
val myFlow = flowOf("some value")
val lettersFlow = flowOf("A", "D1", "C", "E", "F", "G", "H")
.onEach { delay(400) } //Make D1 arrive BEFORE 1000
.shareIn(this, SharingStarted.Eagerly) //Guessing you want "hot" Flow
myFlow.map { value ->
withTimeoutOrNull(1000) { //Wait at most 1 sec
lettersFlow.firstOrNull { it == "D1" } //Wait for "D1"
} ?: value //Fallback if we didn't see the event
}.collect {
println(it)
}
}