pablisco
12/02/2020, 10:18 AMfun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T>
The idea being that the initial
value is emitted once and then f
is applied recursively. And then finish the flow when cancelled or when an empty flow is produced (duh). I’ll post in thread what I’ve tried so far without much luck…pablisco
12/02/2020, 10:19 AMfun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
emit(initial)
var current: Flow<T> = f(initial)
while (current.count() > 0) {
emitAll(current)
current = current.flatMapConcat(f)
}
}
This one feels like it should work, but only the initial item is emitted. I must be running into some deadlock issue with nested suspensionGiorgos Neokleous
12/02/2020, 11:52 AMcount()
come from? I can't see that method in the flow interface 🤔pablisco
12/02/2020, 1:30 PMflow()
pablisco
12/02/2020, 1:31 PMpablisco
12/02/2020, 1:32 PMpablisco
12/02/2020, 1:41 PMfun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
emit(initial)
f(initial).collect { emitAll(loopFlow(it, f)) }
}
But it’s not stack safe 😞pablisco
12/02/2020, 1:44 PMpablisco
12/02/2020, 1:53 PMGiorgos Neokleous
12/02/2020, 2:53 PMcount()
collect everything from the flow and suspend while collecting. That means that if you've got an infinite flow then it will never "resume". I am guessing it's the same for emitAll
. So if you are consuming/receiving a channel for example it will never proceed further. (this is just a speculation)pablisco
12/02/2020, 3:01 PMGiorgos Neokleous
12/02/2020, 3:38 PMpablisco
12/02/2020, 5:15 PMflatMapConcat
does (main difference is that flatMapMerge
doesn’t do the transformations in sequence). The thing with both of these functions is that they both are a one time transformation but in this case what I’m looking for is for transformation recursively like:
flow.flatMapMerge { f(it) }.flatMapMerge { f(it) }... // and so on
🙂pablisco
12/02/2020, 5:17 PMpablisco
12/02/2020, 5:23 PMtake(1000)
you'll find that we get a memory exception. However if we drop the list and do it like this to remove the memory constraints:
loopFlow2(0 to 1) { (a, b) -> flowOf(b to a + b) }.take(1000).drop(999).collect { println("$it") }
With the original we get a stacktrace error… However, with the second version we get this other error (although the result is printed) 🤔pablisco
12/02/2020, 5:25 PMemitAll
:
fun <T> loopFlow2(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
emit(initial)
emitAll(f(initial).flatMapMerge { loopFlow2(value, it) })
}
pablisco
12/02/2020, 5:27 PMpablisco
12/02/2020, 5:27 PMGiorgos Neokleous
12/02/2020, 5:35 PMlouiscad
12/02/2020, 7:10 PMpablisco
12/02/2020, 7:57 PMlouiscad
12/02/2020, 7:59 PM