Hello everyone! I was wondering if someone has tri...
# coroutines
p
Hello everyone! I was wondering if someone has tried to implement something like this:
Copy code
fun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T>
The idea being that the
initial
value is emitted once and then
f
is applied recursively. And then finish the flow when cancelled or when an empty flow is produced (duh). I’ll post in thread what I’ve tried so far without much luck…
Copy code
fun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
    emit(initial)
    var current: Flow<T> = f(initial)
    while (current.count() > 0) {
        emitAll(current)
        current = current.flatMapConcat(f)
    }
}
This one feels like it should work, but only the initial item is emitted. I must be running into some deadlock issue with nested suspension
g
Do you have a test code for this? Also, where does
count()
come from? I can't see that method in the flow interface 🤔
p
count is an extension function of
flow()
I’m not very happy with that implementation, specially because it doesn’t work as I expected
No test code yet sadly… maybe I’ll try creating something shareable. More wondering if someone has had a problem where they needed to make something recursive with flows and wanted to avoid stack hell
This is a potential candidate:
Copy code
fun <T> loopFlow(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
    emit(initial)
    f(initial).collect { emitAll(loopFlow(it, f)) }
}
But it’s not stack safe 😞
This is a very simplistic example: https://pl.kotl.in/g2gb8HeSn This can be done very easily in other ways, but it was just to demostrate an infinite flow that flatmaps the result of each flatmap 🙂
I think I need to use something like what @elizarov mentions in this article, but I’m not sure how to apply it to flows 🙃 I’ll have to investigate further
g
Wouldn't
count()
collect everything from the flow and suspend while collecting. That means that if you've got an infinite flow then it will never "resume". I am guessing it's the same for
emitAll
. So if you are consuming/receiving a channel for example it will never proceed further. (this is just a speculation)
p
Yeah, I mean that was the example I didn’t like… the second one I posted is a bit better but but still has the problem of not being stack safe
g
flatMapMerge might do the trick? Playground link
p
It’s not far from what
flatMapConcat
does (main difference is that
flatMapMerge
doesn’t do the transformations in sequence). The thing with both of these functions is that they both are a one time transformation but in this case what I’m looking for is for transformation recursively like:
Copy code
flow.flatMapMerge { f(it) }.flatMapMerge { f(it) }... // and so on
🙂
Oh sorry, I just saw your playground link 😅
@Giorgos Neokleous If you try to
take(1000)
you'll find that we get a memory exception. However if we drop the list and do it like this to remove the memory constraints:
Copy code
loopFlow2(0 to 1) { (a, b) -> flowOf(b to a + b) }.take(1000).drop(999).collect { println("$it") }
With the original we get a stacktrace error… However, with the second version we get this other error (although the result is printed) 🤔
btw, we can hide de collection with an
emitAll
:
Copy code
fun <T> loopFlow2(initial: T, f: suspend (T) -> Flow<T>): Flow<T> = flow {
    emit(initial)
    emitAll(f(initial).flatMapMerge { loopFlow2(value, it) })
}
However, I like this, I think it’s going into the right direction 🙂
I’m gonna put a reminder as I’m going on holidays 😅
👍 1
g
Happy Holidays 🙂
l
@pablisco I think writing a proper KDoc for that function will help you (or us) to find how to implement it.
p
I'm not sure what else, other than what I put originally, would I add to a kdoc 😅
l
You would write it starting with "the idea of" ? 🙃