Satyam Agarwal
01/27/2020, 6:11 PMkyleg
01/27/2020, 6:18 PMtraverse/sequence
for a large iterable.
There was an issue where the lambdas were being eagerly evaluated. IIRC one of the Arrow devs fixed this issue but it’s not in a release yet.Satyam Agarwal
01/27/2020, 6:20 PMJannis
01/27/2020, 7:46 PMlazyAp
(which is every applicative that can short-circuit or is otherwise lazy like IO
, Eval
etc). However this does not hold for parApplicative
which is a specialized applicative for IO
that parTraverse
uses.
This means that it will fold the entire iterable first, build up an IO
action and then, only when it's done with that, execute the IO
. This means for infinite sequences the IO
will never be built and thus never execute (it'll just clog up memory...)
There is no good solution for this yet. If you have an idea on how to implement lazyAp
for this type: (https://github.com/arrow-kt/arrow/blob/3051b67bcc4298c5b1553be134bd4222da04f3c6/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/ParApplicative.kt) that would be great 😅
cc @simon.vergauwen he probably knows far better what needs to be done there than me ^^
In the mean time you can try implementing this manually by using foldRight
and fork
to spawn fibers. Just be careful when you evalute the accumulator (wrapped in eval). If you evaluate before your IO
action is built it will end up the same. If you force it inside the IO
action it will be lazy and it will work fine (unless you do that as the first thing, then it has the same effect again). This is all kind of complex and annoying. Hopefully we'll have a version of parApplicative
soon that can do that on its own. If you have questions feel free to ping me 🙂pakoito
01/27/2020, 9:30 PMpakoito
01/27/2020, 9:30 PMpakoito
01/27/2020, 9:31 PMacquireN
pakoito
01/27/2020, 9:31 PMreleaseN
Satyam Agarwal
01/28/2020, 9:44 AMSatyam Agarwal
01/28/2020, 9:49 AMJannis
01/29/2020, 8:27 AMfun <A, B> Iterable<A>.foldRight(Eval.now(IO.just(emptyList<B>()))) { v, acc ->
IO.fx {
// Run the action for the current element in a new fiber
val x = semaphore.withPermit(f(v)).fork().bind()
// run the accumulator
// This means since starting fibers is pretty much instant, we will start all fibers at the same time and the semaphore handles if the block or run. This could also mean that it uses quite a bit of memory because it accumulates in such a way, but that should be tested first^^
val xs = acc.value().bind()
// In most cases this will have finished already, we just need the result
val xRes = x.join().bind()
listOf(xRes) + xs
}.let { Eval.now(it) } // only for foldRight, IO is already lazy so no need for more complex evals
}.value() // now you should have an IO action that spawns n fibers and collects all results
Keep in mind that certain things are not handled well here:
If the first element errors out, the accumulator will still execute (although the whole IO will fail the effects are still produced).
This will spawn all fibers upfront, if you don't want that you should use the semaphore outside of fork
.
There are also probably many other problematic things concerning cancellation which as far as I can tell is not supported by the code I wrote.
So a few things that I did differently than you and why:
parApplicative
is a high level construct that we don't use since we are only interested in low level stuff like fork
/ join
etc. If possible parApplicative
should support this behaviour exactly the same way, but it doesn't as of now, so no point using it. (I think I have some ideas on how to implement it there, but no promises 😕, will write when I find something)
You were using join
together with map2Eval
from parApplicative
which starts both IO
's in seperate fibers, runs them and collects the results. That would be fine if map2Eval
would not be strict in its argument (for parApplicative
only afaik) which means it always tried to evaluate the accumulator eval first leading to the same behaviour as traverse
. I ignored that and just started the fiber first and then run the accumulator.
I also omitted a bunch of stuff, but that was only for brevity, the core part should be the foldRight and that should be there...
Again use this with care as I am not sure how the semantics of this are around cancellation (which likely won't work as expected) and errors.Jannis
01/29/2020, 8:33 AMlazyAp
would it not be enough to delay evaluation of the argument in parMap(a, b, f)
until a
has started? Because as far as I can tell parMapN
does not use b
before (except in an error case for a
but if you put b
inside Eval
instead of a simple function memoization should handle this properly).
For traverse
that would mean all fibers are started at once (but they are actually started!). Imo that'd be much better than what lazyAp
does atmJannis
01/29/2020, 8:35 AMparApplicative
works for `IO`: It all boils down to this method here: https://github.com/arrow-kt/arrow/blob/47818680653dee7461f18ccd7afe1d165ec0b0da/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt#L63Satyam Agarwal
01/29/2020, 8:46 AMSatyam Agarwal
01/30/2020, 8:57 AMJannis
01/30/2020, 1:24 PMaquireN(limit)
on the semaphore right after creating the semaphore? The result of foldRight
is a huge computation built from smaller ones that all try to aquire one element of the semaphore to run. None of them can complete, so the entire IO
can't. So in the end you build up a huge action again (clogging up memory, but because your semaphore has all permits taken, none of it can run).
Also on a side note: I think foldRight
is defined for iterables and/or sequences as well, so no need to create the whole list in memory with toList
simon.vergauwen
01/31/2020, 9:17 AMparTraverse
suffers the same problem as traverse
. The difference between parTraverse
and traverse
is that it uses a parallel
applicative instance.
Or in other words if List<A>.traverse
flips the List<IO
composition to IO<List
by folding over the List
and using map2
to compose the IO
that fill the list.
List<A>.parTraverse
does the exact same thing but it uses parMap2
instead.simon.vergauwen
01/31/2020, 9:19 AMparApplicative
is an Applicative
instance where all the operations are derived through parMapN
.Jannis
01/31/2020, 11:20 AMJannis
01/31/2020, 11:21 AMparMap2
which takes the second argument in an eval and doesn't evaluate it before the first is running. This would lead to copypasta because I would not want to make that default and wrap everything else in Eval.Now
Jannis
01/31/2020, 11:23 AMzipWithNext
and then doing a special traverse
on that)Jannis
01/31/2020, 11:24 AMsimon.vergauwen
01/31/2020, 12:50 PMJannis
01/31/2020, 1:16 PMparTraverse
later for future referenceSatyam Agarwal
01/31/2020, 1:53 PMsemaphore.withPermit(f(v)).fork().brackt(...)
?
@Jannis After removing the acquireN
from the gist, I was able to parse upto 60000 with 8 gigs, which is rather nice than before.
Also, Can anyone of you help me understand, how fork and join fiber implementation works in context of IOPool and Fiber context in short, I tried reading om withPermit()
, fork()
and join()
from api docs, i think I have good idea, but I’d like to hear from you guys as well 🙂simon.vergauwen
01/31/2020, 2:41 PMfork
in IO
it basically just starts the IO
on a certain CoroutineContext
which in the end goes to Executor#execute
somewhere.
So for an IOPool
Executor that means that each task get scheduled on a new (separate cached) thread.
Semaphore#withPermit
basically just wraps your IO
with the acquire
and release
combinators.
fork().flatMap { }
is the equivalent of fire & forget since flatMap
is a cancel boundary and thus the fiber could leak since might never get canceled or joined.
fork().bracket(use = { (join, cancel) -> … }, release= { (_, cancel) -> cancel })
ensures that the fiber is always closed and thus cannot leak.simon.vergauwen
01/31/2020, 2:42 PMparTraverseN
? https://kotlinlang.slack.com/archives/C5UPMM0A0/p1580148686164400 this one?
That should be absolutely fine, it runs by default on CommonPool
thosimon.vergauwen
01/31/2020, 2:42 PMfun <A, B> Iterable<A>.parTraverseN(limit: Long, ctx: CoroutineContext, f: (A) -> IOOf<B>): IO<List<B>> {
return IO.concurrent().run {
Semaphore(limit).flatMap { s -> parTraverse(ctx) { a -> s.withPermit(f(a)) } }
}
}
in combination with IO.dispatchers().io()
would allow you to run each task on an IOPool
thread.Satyam Agarwal
01/31/2020, 11:13 PMfoldRight
method of kotlin Collection library, as it does not know about io and how to defer the operations, right ?Jannis
01/31/2020, 11:34 PMfoldRight
is a completly useless method and should never be used unless you want a short cut of reversed().foldLeft
(iirc there are minor differences but that does not matter). It being strict in the accumulator is also bad.
I'll write up some thoughts about those two, sorry if you already know about it 🙏
foldRight
in functional programming is completely different from foldRight
in the stdlib. To undersand why, we have to take a close look at the Right
part of it: In the stdlib this means the structure is literally folded from the right so foldRight == reversed().foldLeft
(for the most part that is). In the fp world this is different: Right
refers to associativity, not order! So foldLeft(s, f) == foldRight(s, f)
in all cases where f
is strict in both arguments. The difference however becomes clear when you do foldLeft(0) { 1 }
and foldRight(0) { 1 }
. foldLeft
will loop forever on infinite structures because it evaluates the accumulator first on every step. foldRight
will work just fine because it leaves it to the function to evaluate (or not evaluate) the accumulator. This is also the reason traverse
uses foldRight
and it leaves it up to the Applicative
instance wether or not to evaluate the accumulator (Which is the reason lazyAp
exists)Satyam Agarwal
02/01/2020, 7:09 AMsimon.vergauwen
02/01/2020, 8:48 AMJannis
02/01/2020, 10:33 AMfoldLeft
and `foldRight`: foldLeft
starts with the accumulator and foldRight
puts it last. Everything else is correct tho, the order is still left to right for both.
Also this blog post on the 47 deg site explains foldRight
as right to left but then implements it left to right 🙈 I think the scala std lib probably does the same thing as the kotlin one so thats where that comes from... https://www.47deg.com/blog/recursion-schemes-introduction/
Oh fuck... We have that wrong on our docs as well... https://next.arrow-kt.io/docs/arrow/typeclasses/foldable/ I guess I really do need to write this up and provide some better docssimon.vergauwen
02/01/2020, 10:34 AMJannis
02/01/2020, 10:54 AMsimon.vergauwen
02/01/2020, 10:56 AMJannis
02/01/2020, 10:58 AMJannis
02/01/2020, 10:59 AMsimon.vergauwen
02/01/2020, 11:00 AMsimon.vergauwen
02/01/2020, 11:00 AMsimon.vergauwen
02/01/2020, 11:00 AMJannis
02/01/2020, 11:02 AMJannis
02/01/2020, 11:02 AMsimon.vergauwen
02/01/2020, 11:03 AMsimon.vergauwen
02/01/2020, 11:03 AMsimon.vergauwen
02/01/2020, 11:04 AMsimon.vergauwen
02/01/2020, 11:04 AMsimon.vergauwen
02/01/2020, 11:05 AMJannis
02/01/2020, 11:06 AMJannis
02/01/2020, 11:06 AMsimon.vergauwen
02/01/2020, 11:07 AMJannis
02/01/2020, 11:07 AMAndThen
if it is not run as a top level funtion, but instead at depth itself does not protect one from stackoverflows at all if max depth is actually 127simon.vergauwen
02/01/2020, 11:08 AMsimon.vergauwen
02/01/2020, 11:08 AMsimon.vergauwen
02/01/2020, 11:09 AMAndThen(f).andThen(g)
is now 1 function invocationsimon.vergauwen
02/01/2020, 11:09 AMsimon.vergauwen
02/01/2020, 11:10 AMloop
like we do for FlatMap
in the IORunLoopsimon.vergauwen
02/01/2020, 11:10 AMdo { } while()
so that doesn’t StackOverflow.simon.vergauwen
02/01/2020, 11:10 AMJannis
02/01/2020, 11:11 AMandThen
from syntax which wraps it: f andThen g = { g(f()) }
So the result is depth one, but once you do that again it adds to depth right?simon.vergauwen
02/01/2020, 11:13 AMandThenF
which is like FlatMap
or a iteration over while
which clears the stack and saves the state on the heap.
infix fun <C> compose(g: (C) -> A): AndThen<C, B> =
when (this) {
// Fusing calls up to a certain threshold, using the fusion technique implemented for `IO#map`
is Single -> if (index != maxStackDepthSize) Single(f compose g, index + 1)
else composeF(AndThen(g))
else -> composeF(AndThen(g))
}
Jannis
02/01/2020, 11:16 AMAndThen(f).andThen(g)
increases depth, so using it at depth is unsafe. And if IO
does the same it is equally unsafe right? I mean that won't be a huge problem for IO
but AndThen
might fall over thissimon.vergauwen
02/01/2020, 11:17 AMf
is at depthsimon.vergauwen
02/01/2020, 11:17 AMwhile
simon.vergauwen
02/01/2020, 11:17 AMJannis
02/01/2020, 11:18 AMTrampoline
right?simon.vergauwen
02/01/2020, 11:51 AMFunction1
and Free
simon.vergauwen
02/01/2020, 11:51 AMAndThen
is the concrete and optimised version of that specific for Jvm (all this is specific to JVM so nvm :D). Which would make it a good alternative to use in implementations to make things stack safeJannis
02/01/2020, 12:43 PM