Hi, does Arrow have some utility to fold a possibl...
# arrow
t
Hi, does Arrow have some utility to fold a possibly infinite async stream with a termination condition?
b
Are you asking if there is the equivalent of Scala's
fs2
for Arrow?
r
//cc @simon.vergauwen
s
There currently is no such tool in Arrow, it’s however planned as the next feature in Arrow Fx. Depending on your use-case it could be relatively easy to write using
IO
.
t
fs2
seems to support what I am looking for but I have never used scala. It is a quite easy use case: I get lines from a database in chunks. The chunk-fetching is async. If a value of the line is the same as one of a line fetched before, they get merged. I want to get N lines after merging and I do not know in advance how many lines have to be merged. So I want to fold over this async stream of chunks and terminate when my result list reaches a certain size.
s
You can easily build something like that using
Ref
or
Queue
and
IO
. I.e. an unbounded
Queue
would allow you to
insert
N lines from the database, when using simple recursion you can pass the previous line along as param to compare with the next line. When finished you can simply
takeAll
from the
Queue
.
Queue
will be released next week in
0.10.5
, and
Ref
allows for something but it acts like an
AtomicReference
rather than a
Queue
so you’d be required to merge everything in a
List
manually. Assuming you’d do simple
List
concatenation with
List#plus
that could be relatively simple as well.
I’ll try to write a code example later today. A tool inspired by FS2 is planned on the roadmap for the next big feature in Arrow Fx.
t
Thanks for the explanation! I will take a look at
Queue
s
If you’d like to play with
Queue
already it’s available in the
0.10.5-SNAPSHOT
. We’re currently just in the process of the last finishing touches.
@till.rathschlag I took a shot at it but I wasn’t sure how getting the chunks from the db works yet. If you have some code that can mock it, I’d be happy to take another look later 🙂
t
Thanks a lot! I actually need to build a
Map
but this example is completely sufficient for me. And just that I understand correctly:
process
is not really 'recursive', right? It terminates immediately and returns either the final value or the recipe you have to execute further to get there. (I am a functional newbie ;))
s
You’re very welcome!
or the recipe you have to execute further to get there
That’s the recursion part 😉 It’s quite simple with
IO
tho and
IO
guarantees stack-safety etc so no need to worry about that!
t
Yes, the stack-safety is what I meant by "not really recursive". So more like "not stack-exploding recursive". :)
s
Exactly 👍