So this program fills up ~8G and does not print an...
# arrow
s
So this program fills up ~8G and does not print anything, after some time it either throws error for thread starvation or exits due to heap space unavailable (Because I am providing 8G of heap). But this shouldn’t take this much. Can you guys help me on what am I doing wrong here. Actual use case could be, I want to make 100000 network request and semaphore limit is my http client pool connections, say 10. Using 0.10.4 version
k
You should definitely wait for someone who knows more than I do, but I ran into a similar issue when I was working with
traverse/sequence
for a large iterable. There was an issue where the lambdas were being eagerly evaluated. IIRC one of the Arrow devs fixed this issue but it’s not in a release yet.
s
Thanks. I’ll update the arrow version I am using in the post.
j
@kyleg That only holds for applicatives which implement
lazyAp
(which is every applicative that can short-circuit or is otherwise lazy like
IO
,
Eval
etc). However this does not hold for
parApplicative
which is a specialized applicative for
IO
that
parTraverse
uses. This means that it will fold the entire iterable first, build up an
IO
action and then, only when it's done with that, execute the
IO
. This means for infinite sequences the
IO
will never be built and thus never execute (it'll just clog up memory...) There is no good solution for this yet. If you have an idea on how to implement
lazyAp
for this type: (https://github.com/arrow-kt/arrow/blob/3051b67bcc4298c5b1553be134bd4222da04f3c6/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/ParApplicative.kt) that would be great 😅 cc @simon.vergauwen he probably knows far better what needs to be done there than me ^^ In the mean time you can try implementing this manually by using
foldRight
and
fork
to spawn fibers. Just be careful when you evalute the accumulator (wrapped in eval). If you evaluate before your
IO
action is built it will end up the same. If you force it inside the
IO
action it will be lazy and it will work fine (unless you do that as the first thing, then it has the same effect again). This is all kind of complex and annoying. Hopefully we'll have a version of
parApplicative
soon that can do that on its own. If you have questions feel free to ping me 🙂
the example seems to require more API calls
acquireN
and
releaseN
s
I have tried to accomodate the feedback given by @Jannis and @pakoito. Ofcourse I am missing pieces, can you guys please help me 🙂 ?
cc: @simon.vergauwen
j
So what you probably would want to do is more along the lines of:
Copy code
fun <A, B> Iterable<A>.foldRight(Eval.now(IO.just(emptyList<B>()))) { v, acc ->
  IO.fx {
    // Run the action for the current element in a new fiber
    val x = semaphore.withPermit(f(v)).fork().bind()
    // run the accumulator
    // This means since starting fibers is pretty much instant, we will start all fibers at the same time and the semaphore handles if the block or run. This could also mean that it uses quite a bit of memory because it accumulates in such a way, but that should be tested first^^
    val xs = acc.value().bind()
    // In most cases this will have finished already, we just need the result
    val xRes = x.join().bind()
    listOf(xRes) + xs
  }.let { Eval.now(it) } // only for foldRight, IO is already lazy so no need for more complex evals
}.value() // now you should have an IO action that spawns n fibers and collects all results
Keep in mind that certain things are not handled well here: If the first element errors out, the accumulator will still execute (although the whole IO will fail the effects are still produced). This will spawn all fibers upfront, if you don't want that you should use the semaphore outside of
fork
. There are also probably many other problematic things concerning cancellation which as far as I can tell is not supported by the code I wrote. So a few things that I did differently than you and why:
parApplicative
is a high level construct that we don't use since we are only interested in low level stuff like
fork
/
join
etc. If possible
parApplicative
should support this behaviour exactly the same way, but it doesn't as of now, so no point using it. (I think I have some ideas on how to implement it there, but no promises 😕, will write when I find something) You were using
join
together with
map2Eval
from
parApplicative
which starts both
IO
's in seperate fibers, runs them and collects the results. That would be fine if
map2Eval
would not be strict in its argument (for
parApplicative
only afaik) which means it always tried to evaluate the accumulator eval first leading to the same behaviour as
traverse
. I ignored that and just started the fiber first and then run the accumulator. I also omitted a bunch of stuff, but that was only for brevity, the core part should be the foldRight and that should be there... Again use this with care as I am not sure how the semantics of this are around cancellation (which likely won't work as expected) and errors.
@simon.vergauwen For implementing
lazyAp
would it not be enough to delay evaluation of the argument in
parMap(a, b, f)
until
a
has started? Because as far as I can tell
parMapN
does not use
b
before (except in an error case for
a
but if you put
b
inside
Eval
instead of a simple function memoization should handle this properly). For
traverse
that would mean all fibers are started at once (but they are actually started!). Imo that'd be much better than what
lazyAp
does atm
@Satyam Agarwal if you are intersted in how
parApplicative
works for `IO`: It all boils down to this method here: https://github.com/arrow-kt/arrow/blob/47818680653dee7461f18ccd7afe1d165ec0b0da/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt#L63
s
Thank you so much 🙂 . I’ll take some time to wrap my head around it 😅 and come back with snippets/questions 🙂
@Jannis I think I understand now what do you mean by above. I tried making some sense of it and wrote these snippets (both of them don’t work) https://gist.github.com/satyamagarwal/d66159eb216dab67d24f20bdf75e9c18 Can you guys please see and help me solve this, I really want to nail this down because every once in a while we get a job thats too big, and with Arrow it becomes so much easier. 🙂
j
Hmm, why are you calling
aquireN(limit)
on the semaphore right after creating the semaphore? The result of
foldRight
is a huge computation built from smaller ones that all try to aquire one element of the semaphore to run. None of them can complete, so the entire
IO
can't. So in the end you build up a huge action again (clogging up memory, but because your semaphore has all permits taken, none of it can run). Also on a side note: I think
foldRight
is defined for iterables and/or sequences as well, so no need to create the whole list in memory with
toList
s
@Jannis
parTraverse
suffers the same problem as
traverse
. The difference between
parTraverse
and
traverse
is that it uses a
parallel
applicative instance. Or in other words if
List<A>.traverse
flips the
List<IO
composition to
IO<List
by folding over the
List
and using
map2
to compose the
IO
that fill the list.
List<A>.parTraverse
does the exact same thing but it uses
parMap2
instead.
Long story short
parApplicative
is an
Applicative
instance where all the operations are derived through
parMapN
.
j
Hmm well there could be two things one could do here:
add a version of
parMap2
which takes the second argument in an eval and doesn't evaluate it before the first is running. This would lead to copypasta because I would not want to make that default and wrap everything else in
Eval.Now
Or add other means of traversing in parallel (like doing something similar to
zipWithNext
and then doing a special
traverse
on that)
Neither sounds all that great, so I'd also be fine with keeping everything as is and just adding notes in relevant places to be wary of such behaviour and suggest workarounds
s
For this kind of thing you’d prefer constant mem space streaming but that’ll be a task for after IO<E, A>
j
Right! Since there will be a better alternative then, I think we should ignore this behaviour for now as either of the two ideas I had aren't all that great... I'll add notes to
parTraverse
later for future reference
👍 1
s
Seems like, then I can just do with parTraverseN implementation for now then ? @simon.vergauwen @Jannis Also, is there a point of using bracket with
semaphore.withPermit(f(v)).fork().brackt(...)
? @Jannis After removing the
acquireN
from the gist, I was able to parse upto 60000 with 8 gigs, which is rather nice than before. Also, Can anyone of you help me understand, how fork and join fiber implementation works in context of IOPool and Fiber context in short, I tried reading om
withPermit()
,
fork()
and
join()
from api docs, i think I have good idea, but I’d like to hear from you guys as well 🙂
s
If you look at the code of
fork
in
IO
it basically just starts the
IO
on a certain
CoroutineContext
which in the end goes to
Executor#execute
somewhere. So for an
IOPool
Executor that means that each task get scheduled on a new (separate cached) thread.
Semaphore#withPermit
basically just wraps your
IO
with the
acquire
and
release
combinators.
fork().flatMap { }
is the equivalent of fire & forget since
flatMap
is a cancel boundary and thus the fiber could leak since might never get canceled or joined.
fork().bracket(use = { (join, cancel) ->  … }, release= { (_, cancel) -> cancel })
ensures that the fiber is always closed and thus cannot leak.
@Satyam Agarwal what impl are you using for
parTraverseN
? https://kotlinlang.slack.com/archives/C5UPMM0A0/p1580148686164400 this one? That should be absolutely fine, it runs by default on
CommonPool
tho
Copy code
fun <A, B> Iterable<A>.parTraverseN(limit: Long, ctx: CoroutineContext, f: (A) -> IOOf<B>): IO<List<B>> {
    return IO.concurrent().run {
        Semaphore(limit).flatMap { s -> parTraverse(ctx) { a -> s.withPermit(f(a)) } }
    }
}
in combination with
IO.dispatchers().io()
would allow you to run each task on an
IOPool
thread.
s
Yes. With the exact implementation you posted above. One last question, I cannot use
foldRight
method of kotlin Collection library, as it does not know about io and how to defer the operations, right ?
j
Yes the stdlib
foldRight
is a completly useless method and should never be used unless you want a short cut of
reversed().foldLeft
(iirc there are minor differences but that does not matter). It being strict in the accumulator is also bad. I'll write up some thoughts about those two, sorry if you already know about it 🙏
foldRight
in functional programming is completely different from
foldRight
in the stdlib. To undersand why, we have to take a close look at the
Right
part of it: In the stdlib this means the structure is literally folded from the right so
foldRight == reversed().foldLeft
(for the most part that is). In the fp world this is different:
Right
refers to associativity, not order! So
foldLeft(s, f) == foldRight(s, f)
in all cases where
f
is strict in both arguments. The difference however becomes clear when you do
foldLeft(0) { 1 }
and
foldRight(0) { 1 }
.
foldLeft
will loop forever on infinite structures because it evaluates the accumulator first on every step.
foldRight
will work just fine because it leaves it to the function to evaluate (or not evaluate) the accumulator. This is also the reason
traverse
uses
foldRight
and it leaves it up to the
Applicative
instance wether or not to evaluate the accumulator (Which is the reason
lazyAp
exists)
s
I didn’t know. I am figuring out fp one practical tasks at a time by trying to apply as many concepts as I can. Thanks for the explanation. It helps a lot, and now I know 🙂
s
@Jannis that is a great explanation! Would you be up to writing a small article about this somewhere for the site? Perhaps with some playgrounds to indicate the differences like you mentioned above. Not sure where something like that would go tho. cc\ could we do an Arrow blog by one of the maintainers directly on the site, or could this go in the Arrow core docs? Or close to Traverse?
j
I think foldable docs or something similar. This is something that is wrong in lots of places (outside of the hackage docs for the prelude which explain this almost perfectly for once). Also I messed one thing up in that explanation: There is one more difference in fp
foldLeft
and `foldRight`:
foldLeft
starts with the accumulator and
foldRight
puts it last. Everything else is correct tho, the order is still left to right for both. Also this blog post on the 47 deg site explains
foldRight
as right to left but then implements it left to right 🙈 I think the scala std lib probably does the same thing as the kotlin one so thats where that comes from... https://www.47deg.com/blog/recursion-schemes-introduction/ Oh fuck... We have that wrong on our docs as well... https://next.arrow-kt.io/docs/arrow/typeclasses/foldable/ I guess I really do need to write this up and provide some better docs
🙈 2
s
Yes, that would be amazing Jannis!
j
The api docs are even better: They have the same header as the foldable docs, so they describe it as right to left, but in the methods api docs it only mentions associativity. Anyway I can use this distraction now... Been trying to figure out for a while now why in ci only the kotlin compiler runs out of memory and that is no fun when it works just fine locally
s
Yes, it’s super annoying.. And it also depends on what machine it runs in Github Actions. Are you talking about the tests freaking out, or the build? I.e. we saw our build stack overflow on PR builds but not release builds… depending on some weird linux version.
j
The build only, and I had it run out of memory locally before as well especially after adding instances of the effect hierarchy, so it may just be extension functions. (The kotlin compile process also starts with Xmx1024 regardless of gradle options which probably does not help).
For context, it is arrow-check and I have a bit of investigation documented in one of the open prs
s
Can you share a link?
We’re seeing something weird since last week where ParMap stackoverflows 1/4 builds on CI only
Cannot reproduce locally
Out of memory tho, not stackoverflows
s
Could be related issue. Reduced build servers with less resources, if they have less memory or stack available they may blow up before a regular jvm instance would.
I.e. we should make the stackDepth configurable through system variables so someone who beefs up his JVM instances can also 4x the stack depth and reduce trampolining by 4x
Those are extremely rare use-cases where you want that tho
But could also be useful to for example decrease stack depth for a free (reduced resources) CI server.
Trampolining transfers stack space to heap, which can also cause OOM.
j
I am pretty sure tho that the kotlin compiler does not do that during it's code generation. At least not with arrow code^^
But yeah that is a good idea
s
Yes, I doubt that too 😄 @raulraja might be able to give you some more help tho given you documented the steps along the way
j
Currently stackDepth is limited to 127 right? Which is fun because I recently thought about how
AndThen
if it is not run as a top level funtion, but instead at depth itself does not protect one from stackoverflows at all if max depth is actually 127
s
Right, if you use it at depth you’re still responsible for it
That’s not true, the stackdepth in AndThen dictates the composition
AndThen(f).andThen(g)
is now 1 function invocation
Which we take up to a depth of 127, which is the max stack depth
Then we store the state and
loop
like we do for
FlatMap
in the IORunLoop
Which is basically
do { } while()
so that doesn’t StackOverflow.
Am I missing something?
j
It internally uses
andThen
from syntax which wraps it:
f andThen g = { g(f()) }
So the result is depth one, but once you do that again it adds to depth right?
s
Yes, but once that reaches the stackDepth it composes with
andThenF
which is like
FlatMap
or a iteration over
while
which clears the stack and saves the state on the heap.
Copy code
infix fun <C> compose(g: (C) -> A): AndThen<C, B> =
    when (this) {
      // Fusing calls up to a certain threshold, using the fusion technique implemented for `IO#map`
      is Single -> if (index != maxStackDepthSize) Single(f compose g, index + 1)
      else composeF(AndThen(g))
      else -> composeF(AndThen(g))
    }
j
Yes but still subsequent use of
AndThen(f).andThen(g)
increases depth, so using it at depth is unsafe. And if
IO
does the same it is equally unsafe right? I mean that won't be a huge problem for
IO
but
AndThen
might fall over this
s
Okay, gotcha. if
f
is at depth
We can also remove that optimization, not sure if it actually speeds anything up and always loop over
while
that would actually guarantee stack safety, no?
j
It would, but then it is basically the same as
Trampoline
right?
s
Conceptually yes, Trampoline is implemented with
Function1
and
Free
So
AndThen
is the concrete and optimised version of that specific for Jvm (all this is specific to JVM so nvm :D). Which would make it a good alternative to use in implementations to make things stack safe
j
I'd probably set up benchmarks first to compare it. But it's definitly worth a try
👍 1