Satyam Agarwal
01/27/2020, 6:11 PMkyleg
01/27/2020, 6:18 PMtraverse/sequence for a large iterable.
There was an issue where the lambdas were being eagerly evaluated. IIRC one of the Arrow devs fixed this issue but it’s not in a release yet.Satyam Agarwal
01/27/2020, 6:20 PMJannis
01/27/2020, 7:46 PMlazyAp (which is every applicative that can short-circuit or is otherwise lazy like IO , Eval etc). However this does not hold for parApplicative which is a specialized applicative for IO that parTraverse uses.
This means that it will fold the entire iterable first, build up an IO action and then, only when it's done with that, execute the IO. This means for infinite sequences the IO will never be built and thus never execute (it'll just clog up memory...)
There is no good solution for this yet. If you have an idea on how to implement lazyAp for this type: (https://github.com/arrow-kt/arrow/blob/3051b67bcc4298c5b1553be134bd4222da04f3c6/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/ParApplicative.kt) that would be great 😅
cc @simon.vergauwen he probably knows far better what needs to be done there than me ^^
In the mean time you can try implementing this manually by using foldRight and fork to spawn fibers. Just be careful when you evalute the accumulator (wrapped in eval). If you evaluate before your IO action is built it will end up the same. If you force it inside the IO action it will be lazy and it will work fine (unless you do that as the first thing, then it has the same effect again). This is all kind of complex and annoying. Hopefully we'll have a version of parApplicative soon that can do that on its own. If you have questions feel free to ping me 🙂pakoito
01/27/2020, 9:30 PMpakoito
01/27/2020, 9:30 PMpakoito
01/27/2020, 9:31 PMacquireNpakoito
01/27/2020, 9:31 PMreleaseNSatyam Agarwal
01/28/2020, 9:44 AMSatyam Agarwal
01/28/2020, 9:49 AMJannis
01/29/2020, 8:27 AMfun <A, B> Iterable<A>.foldRight(Eval.now(IO.just(emptyList<B>()))) { v, acc ->
IO.fx {
// Run the action for the current element in a new fiber
val x = semaphore.withPermit(f(v)).fork().bind()
// run the accumulator
// This means since starting fibers is pretty much instant, we will start all fibers at the same time and the semaphore handles if the block or run. This could also mean that it uses quite a bit of memory because it accumulates in such a way, but that should be tested first^^
val xs = acc.value().bind()
// In most cases this will have finished already, we just need the result
val xRes = x.join().bind()
listOf(xRes) + xs
}.let { Eval.now(it) } // only for foldRight, IO is already lazy so no need for more complex evals
}.value() // now you should have an IO action that spawns n fibers and collects all results
Keep in mind that certain things are not handled well here:
If the first element errors out, the accumulator will still execute (although the whole IO will fail the effects are still produced).
This will spawn all fibers upfront, if you don't want that you should use the semaphore outside of fork.
There are also probably many other problematic things concerning cancellation which as far as I can tell is not supported by the code I wrote.
So a few things that I did differently than you and why:
parApplicative is a high level construct that we don't use since we are only interested in low level stuff like fork / join etc. If possible parApplicative should support this behaviour exactly the same way, but it doesn't as of now, so no point using it. (I think I have some ideas on how to implement it there, but no promises 😕, will write when I find something)
You were using join together with map2Eval from parApplicative which starts both IO 's in seperate fibers, runs them and collects the results. That would be fine if map2Eval would not be strict in its argument (for parApplicative only afaik) which means it always tried to evaluate the accumulator eval first leading to the same behaviour as traverse. I ignored that and just started the fiber first and then run the accumulator.
I also omitted a bunch of stuff, but that was only for brevity, the core part should be the foldRight and that should be there...
Again use this with care as I am not sure how the semantics of this are around cancellation (which likely won't work as expected) and errors.Jannis
01/29/2020, 8:33 AMlazyAp would it not be enough to delay evaluation of the argument in parMap(a, b, f) until a has started? Because as far as I can tell parMapN does not use b before (except in an error case for a but if you put b inside Eval instead of a simple function memoization should handle this properly).
For traverse that would mean all fibers are started at once (but they are actually started!). Imo that'd be much better than what lazyAp does atmJannis
01/29/2020, 8:35 AMparApplicative works for `IO`: It all boils down to this method here: https://github.com/arrow-kt/arrow/blob/47818680653dee7461f18ccd7afe1d165ec0b0da/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt#L63Satyam Agarwal
01/29/2020, 8:46 AMSatyam Agarwal
01/30/2020, 8:57 AMJannis
01/30/2020, 1:24 PMaquireN(limit) on the semaphore right after creating the semaphore? The result of foldRight is a huge computation built from smaller ones that all try to aquire one element of the semaphore to run. None of them can complete, so the entire IO can't. So in the end you build up a huge action again (clogging up memory, but because your semaphore has all permits taken, none of it can run).
Also on a side note: I think foldRight is defined for iterables and/or sequences as well, so no need to create the whole list in memory with toListsimon.vergauwen
01/31/2020, 9:17 AMparTraverse suffers the same problem as traverse. The difference between parTraverse and traverse is that it uses a parallel applicative instance.
Or in other words if List<A>.traverse flips the List<IO composition to IO<List by folding over the List and using map2 to compose the IO that fill the list.
List<A>.parTraverse does the exact same thing but it uses parMap2 instead.simon.vergauwen
01/31/2020, 9:19 AMparApplicative is an Applicative instance where all the operations are derived through parMapN.Jannis
01/31/2020, 11:20 AMJannis
01/31/2020, 11:21 AMparMap2 which takes the second argument in an eval and doesn't evaluate it before the first is running. This would lead to copypasta because I would not want to make that default and wrap everything else in Eval.NowJannis
01/31/2020, 11:23 AMzipWithNext and then doing a special traverse on that)Jannis
01/31/2020, 11:24 AMsimon.vergauwen
01/31/2020, 12:50 PMJannis
01/31/2020, 1:16 PMparTraverse later for future referenceSatyam Agarwal
01/31/2020, 1:53 PMsemaphore.withPermit(f(v)).fork().brackt(...) ?
@Jannis After removing the acquireN from the gist, I was able to parse upto 60000 with 8 gigs, which is rather nice than before.
Also, Can anyone of you help me understand, how fork and join fiber implementation works in context of IOPool and Fiber context in short, I tried reading om withPermit() , fork() and join() from api docs, i think I have good idea, but I’d like to hear from you guys as well 🙂simon.vergauwen
01/31/2020, 2:41 PMfork in IO it basically just starts the IO on a certain CoroutineContext which in the end goes to Executor#execute somewhere.
So for an IOPool Executor that means that each task get scheduled on a new (separate cached) thread.
Semaphore#withPermit basically just wraps your IO with the acquire and release combinators.
fork().flatMap { } is the equivalent of fire & forget since flatMap is a cancel boundary and thus the fiber could leak since might never get canceled or joined.
fork().bracket(use = { (join, cancel) -> … }, release= { (_, cancel) -> cancel }) ensures that the fiber is always closed and thus cannot leak.simon.vergauwen
01/31/2020, 2:42 PMparTraverseN? https://kotlinlang.slack.com/archives/C5UPMM0A0/p1580148686164400 this one?
That should be absolutely fine, it runs by default on CommonPool thosimon.vergauwen
01/31/2020, 2:42 PMfun <A, B> Iterable<A>.parTraverseN(limit: Long, ctx: CoroutineContext, f: (A) -> IOOf<B>): IO<List<B>> {
return IO.concurrent().run {
Semaphore(limit).flatMap { s -> parTraverse(ctx) { a -> s.withPermit(f(a)) } }
}
}
in combination with IO.dispatchers().io() would allow you to run each task on an IOPool thread.Satyam Agarwal
01/31/2020, 11:13 PMfoldRight method of kotlin Collection library, as it does not know about io and how to defer the operations, right ?Jannis
01/31/2020, 11:34 PMfoldRight is a completly useless method and should never be used unless you want a short cut of reversed().foldLeft (iirc there are minor differences but that does not matter). It being strict in the accumulator is also bad.
I'll write up some thoughts about those two, sorry if you already know about it 🙏
foldRight in functional programming is completely different from foldRight in the stdlib. To undersand why, we have to take a close look at the Right part of it: In the stdlib this means the structure is literally folded from the right so foldRight == reversed().foldLeft (for the most part that is). In the fp world this is different: Right refers to associativity, not order! So foldLeft(s, f) == foldRight(s, f) in all cases where f is strict in both arguments. The difference however becomes clear when you do foldLeft(0) { 1 } and foldRight(0) { 1 }. foldLeft will loop forever on infinite structures because it evaluates the accumulator first on every step. foldRight will work just fine because it leaves it to the function to evaluate (or not evaluate) the accumulator. This is also the reason traverse uses foldRight and it leaves it up to the Applicative instance wether or not to evaluate the accumulator (Which is the reason lazyAp exists)Satyam Agarwal
02/01/2020, 7:09 AMsimon.vergauwen
02/01/2020, 8:48 AMJannis
02/01/2020, 10:33 AMfoldLeft and `foldRight`: foldLeft starts with the accumulator and foldRight puts it last. Everything else is correct tho, the order is still left to right for both.
Also this blog post on the 47 deg site explains foldRight as right to left but then implements it left to right 🙈 I think the scala std lib probably does the same thing as the kotlin one so thats where that comes from... https://www.47deg.com/blog/recursion-schemes-introduction/
Oh fuck... We have that wrong on our docs as well... https://next.arrow-kt.io/docs/arrow/typeclasses/foldable/ I guess I really do need to write this up and provide some better docssimon.vergauwen
02/01/2020, 10:34 AMJannis
02/01/2020, 10:54 AMsimon.vergauwen
02/01/2020, 10:56 AMJannis
02/01/2020, 10:58 AMJannis
02/01/2020, 10:59 AMsimon.vergauwen
02/01/2020, 11:00 AMsimon.vergauwen
02/01/2020, 11:00 AMsimon.vergauwen
02/01/2020, 11:00 AMJannis
02/01/2020, 11:02 AMJannis
02/01/2020, 11:02 AMsimon.vergauwen
02/01/2020, 11:03 AMsimon.vergauwen
02/01/2020, 11:03 AMsimon.vergauwen
02/01/2020, 11:04 AMsimon.vergauwen
02/01/2020, 11:04 AMsimon.vergauwen
02/01/2020, 11:05 AMJannis
02/01/2020, 11:06 AMJannis
02/01/2020, 11:06 AMsimon.vergauwen
02/01/2020, 11:07 AMJannis
02/01/2020, 11:07 AMAndThen if it is not run as a top level funtion, but instead at depth itself does not protect one from stackoverflows at all if max depth is actually 127simon.vergauwen
02/01/2020, 11:08 AMsimon.vergauwen
02/01/2020, 11:08 AMsimon.vergauwen
02/01/2020, 11:09 AMAndThen(f).andThen(g) is now 1 function invocationsimon.vergauwen
02/01/2020, 11:09 AMsimon.vergauwen
02/01/2020, 11:10 AMloop like we do for FlatMap in the IORunLoopsimon.vergauwen
02/01/2020, 11:10 AMdo { } while() so that doesn’t StackOverflow.simon.vergauwen
02/01/2020, 11:10 AMJannis
02/01/2020, 11:11 AMandThen from syntax which wraps it: f andThen g = { g(f()) } So the result is depth one, but once you do that again it adds to depth right?simon.vergauwen
02/01/2020, 11:13 AMandThenF which is like FlatMap or a iteration over while which clears the stack and saves the state on the heap.
infix fun <C> compose(g: (C) -> A): AndThen<C, B> =
when (this) {
// Fusing calls up to a certain threshold, using the fusion technique implemented for `IO#map`
is Single -> if (index != maxStackDepthSize) Single(f compose g, index + 1)
else composeF(AndThen(g))
else -> composeF(AndThen(g))
}Jannis
02/01/2020, 11:16 AMAndThen(f).andThen(g) increases depth, so using it at depth is unsafe. And if IO does the same it is equally unsafe right? I mean that won't be a huge problem for IO but AndThen might fall over thissimon.vergauwen
02/01/2020, 11:17 AMf is at depthsimon.vergauwen
02/01/2020, 11:17 AMwhilesimon.vergauwen
02/01/2020, 11:17 AMJannis
02/01/2020, 11:18 AMTrampoline right?simon.vergauwen
02/01/2020, 11:51 AMFunction1 and Freesimon.vergauwen
02/01/2020, 11:51 AMAndThen is the concrete and optimised version of that specific for Jvm (all this is specific to JVM so nvm :D). Which would make it a good alternative to use in implementations to make things stack safeJannis
02/01/2020, 12:43 PM