Hello :hand: , I have an use-case where I need to ...
# coroutines
i
Hello , I have an use-case where I need to implement a
backoff
strategy using the flow.
Copy code
fun main() {
    runBlocking {
        (0..8).asFlow()
            .flatMapConcat { input ->
                // To simulate a flow which fetches data based on a time-window which is exponentially increased if there's no data
                // available with in that time frame.
                flow {
                    println("Input: $input")
                    if (input < 5) {
                        emit(emptyList<String>())
                    } else { // After emitting this once the flow should complete
                        emit(listOf("Available"))
                    }
                }.retryWhenThrow(DummyException(), predicate = {
                    it.isNotEmpty()
                })
            }.collect {
                //println(it)
            }
    }
}

class DummyException : Exception("Collected size is empty")

private inline fun <T> Flow<T>.retryWhenThrow(
    throwable: Throwable,
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
    return flow {
        collect { value ->
            if (!predicate(value)) {
                throw throwable // informing the upstream to keep emitting since the condition is met
            }
            println("Value: $value")
            emit(value)
        }
    }.catch { e ->
        if (e::class != throwable::class) throw e
    }
}
If you run this example in kotlin play ground you will notice that the flow keep emitting the value even after my condition is met(Where the list is not empty anymore). After much digging I found that I need something like takeUntil from rxJava. I need some help how I should proceed this.
m
I’m confused by what you are trying to accomplish… can you post what your expected output of the above should be?
i
Yeah sure @michaelsims. Actual behaviour:
Copy code
Input: 0
Input: 1
Input: 2
Input: 3
Input: 4
Input: 5
Value: [Available] // upstream should stop emitting here since the flow has data
Input: 6
Value: [Available]
Input: 7
Value: [Available]
Input: 8
Value: [Available]
Expected:
Copy code
Input: 0
Input: 1
Input: 2
Input: 3
Input: 4
Input: 5
Value: [Available] // something like takeUntil needed here I guess.
Shorter implementation for the above code
Copy code
(0..8).asFlow()
      .flatMapConcat {
           getDataForValue(it) // Might return empty values
            .retryWhenThrow(DummyException(), predicate = { list ->
                  list.isNotEmpty()
             })
      }
I want similar implementation explained in this article. https://medium.com/over-engineering/rxify-exponential-backoff-on-retry-48bb66912391
m
Thanks for the clarification. I’m still getting up to speed on the Flow APIs myself and this is an interesting learning opportunity, so I’ll probably have some thoughts for you later (if no one else answers in the meantime).
j
@immathan How about this?
Copy code
fun main() {
    runBlocking {
        val f1 = (0..8).asFlow()
        val f2 = f1.flatMapMerge {
            flow {
                if (it < 5) {
                    emit(emptyList())
                } else {
                    emit(listOf("Available"))
                }
            }.filter { it.isNotEmpty() }
        }.take(1)
        f2.collect { println(it) }
    }
}
Prints:
Copy code
[Available]
I changed
flatMapConcat
to
flatMapMerge
because you want the first non-empty list, not the first non-empty list in the order in which the lists were requested. In other words, we accept the list of a more recent request rather than wait for the less recent requests to return their result.
i
@julian this looks promising. I'll try it out and let you know.
👍🏾 1
@julian This will not work even though we collect only one value the
flatMapMerge
, the upstream flow processes all the values.
Copy code
runBlocking {
        val f1 = (0..8).asFlow()
        val f2 = f1.flatMapMerge { input ->
            flow {
                println("Inner flow: $input ")
                if (input < 5) {
                    emit(emptyList())
                } else {
                    emit(listOf("Available"))
                }
            }.filter { it.isNotEmpty() }
        }.take(1)
        f2.collect { println(it) }
    }
Prints:
Copy code
Inner flow: 0 
Inner flow: 1 
Inner flow: 2 
Inner flow: 3 
Inner flow: 4 
Inner flow: 5 
Inner flow: 6 
Inner flow: 7 
Inner flow: 8 
[Available]
j
@immathan Good point!
I will think about this some more!
@immathan Ok, I got something different for you. I think that the big problem with the approaches so far is that
f1
is upstream from
f2
and we're trying to terminate `f1`'s emissions even further downstream than
f2
. Rather than trying to terminate a sequence of chained flows we need the termination logic to apply to an upstream flow of parallel flows. I have in mind the
zip
operator. But using zip is in this case is a bit tricky because we don't want to block the emissions of
f1
while waiting for
f2
to emit. We resolve that by making
f2
a flow of factories of single-emission flow (!!). I think it'll make more sense when you see the code. https://pl.kotl.in/VkjkbCk7I The output is:
Copy code
0
1
2
3
4
5
[Available]
6
The
6
appearing after
[Available]
is fine, because you don't want to block your back-off flow. You just want to terminate it after you get a valid result. IRL, I imagine it won't be necessary for the factory of flow to have
n
as a parameter, since your data API will probably not take any input generated by your back-off flow. As such you can omit that parameter. I used
flattenConcat
but you may prefer
flattenMerge
. I believe the second will allow the order in which you get a result to not be completely determined by the order in which the request was made. That is, the requests to your data API will race each other to complete, regardless of the order in which they were started. But if your backoff intervals are long enough, and your data API is fast enough,
flattenConcat
is adequate.
@immathan Did you check out the solution I offered? https://pl.kotl.in/VkjkbCk7I