immathan
07/13/2020, 2:05 PMbackoff
strategy using the flow.
fun main() {
runBlocking {
(0..8).asFlow()
.flatMapConcat { input ->
// To simulate a flow which fetches data based on a time-window which is exponentially increased if there's no data
// available with in that time frame.
flow {
println("Input: $input")
if (input < 5) {
emit(emptyList<String>())
} else { // After emitting this once the flow should complete
emit(listOf("Available"))
}
}.retryWhenThrow(DummyException(), predicate = {
it.isNotEmpty()
})
}.collect {
//println(it)
}
}
}
class DummyException : Exception("Collected size is empty")
private inline fun <T> Flow<T>.retryWhenThrow(
throwable: Throwable,
crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
return flow {
collect { value ->
if (!predicate(value)) {
throw throwable // informing the upstream to keep emitting since the condition is met
}
println("Value: $value")
emit(value)
}
}.catch { e ->
if (e::class != throwable::class) throw e
}
}
If you run this example in kotlin play ground you will notice that the flow keep emitting the value even after my condition is met(Where the list is not empty anymore). After much digging I found that I need something like takeUntil from rxJava. I need some help how I should proceed this.michaelsims
07/13/2020, 2:42 PMimmathan
07/13/2020, 3:27 PMInput: 0
Input: 1
Input: 2
Input: 3
Input: 4
Input: 5
Value: [Available] // upstream should stop emitting here since the flow has data
Input: 6
Value: [Available]
Input: 7
Value: [Available]
Input: 8
Value: [Available]
Expected:
Input: 0
Input: 1
Input: 2
Input: 3
Input: 4
Input: 5
Value: [Available] // something like takeUntil needed here I guess.
(0..8).asFlow()
.flatMapConcat {
getDataForValue(it) // Might return empty values
.retryWhenThrow(DummyException(), predicate = { list ->
list.isNotEmpty()
})
}
michaelsims
07/13/2020, 4:36 PMjulian
07/14/2020, 1:58 AMfun main() {
runBlocking {
val f1 = (0..8).asFlow()
val f2 = f1.flatMapMerge {
flow {
if (it < 5) {
emit(emptyList())
} else {
emit(listOf("Available"))
}
}.filter { it.isNotEmpty() }
}.take(1)
f2.collect { println(it) }
}
}
Prints:
[Available]
I changed flatMapConcat
to flatMapMerge
because you want the first non-empty list, not the first non-empty list in the order in which the lists were requested. In other words, we accept the list of a more recent request rather than wait for the less recent requests to return their result.immathan
07/14/2020, 3:32 AMflatMapMerge
, the upstream flow processes all the values.
runBlocking {
val f1 = (0..8).asFlow()
val f2 = f1.flatMapMerge { input ->
flow {
println("Inner flow: $input ")
if (input < 5) {
emit(emptyList())
} else {
emit(listOf("Available"))
}
}.filter { it.isNotEmpty() }
}.take(1)
f2.collect { println(it) }
}
Prints:
Inner flow: 0
Inner flow: 1
Inner flow: 2
Inner flow: 3
Inner flow: 4
Inner flow: 5
Inner flow: 6
Inner flow: 7
Inner flow: 8
[Available]
julian
07/14/2020, 6:31 PMf1
is upstream from f2
and we're trying to terminate `f1`'s emissions even further downstream than f2
.
Rather than trying to terminate a sequence of chained flows we need the termination logic to apply to an upstream flow of parallel flows.
I have in mind the zip
operator. But using zip is in this case is a bit tricky because we don't want to block the emissions of f1
while waiting for f2
to emit.
We resolve that by making f2
a flow of factories of single-emission flow (!!). I think it'll make more sense when you see the code.
https://pl.kotl.in/VkjkbCk7I
The output is:
0
1
2
3
4
5
[Available]
6
The 6
appearing after [Available]
is fine, because you don't want to block your back-off flow. You just want to terminate it after you get a valid result.
IRL, I imagine it won't be necessary for the factory of flow to have n
as a parameter, since your data API will probably not take any input generated by your back-off flow. As such you can omit that parameter.
I used flattenConcat
but you may prefer flattenMerge
. I believe the second will allow the order in which you get a result to not be completely determined by the order in which the request was made. That is, the requests to your data API will race each other to complete, regardless of the order in which they were started. But if your backoff intervals are long enough, and your data API is fast enough, flattenConcat
is adequate.