<@U0RM4EPC7> I was wondering your parTraverseN wit...
# arrow
s
@simon.vergauwen I was wondering your parTraverseN with semaphore solution made it to 0.10.0 ? I am using what you helped with now, if its there already, then I’ll just remove it ?
I can’t seem to find it in the changelog though.
s
Hey, it’s not in 0.10 yet because there were some issues with parallel streaming types (
Observable
,
Flowable
&
Flux
). It’s safe to use with all other types, I’d keep the impl you have for now.
You might be able to make it a lot shorter now with the
parTraverse
and
Semaphore
utilities in Arrow Fx.
s
Aaah nice. I’ll try to iterate over it and show it to you as soon as I have time. Thanks a lot 🙂
👍 1
Hey @simon.vergauwen I am sorry that I am replying so late. But I could only come to this excercise today. So I used to have this function, that you helped me with :
Copy code
fun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
    val tg: Traverse<ForListK> = ListK.traverse()
    val cf: Concurrent<ForIO> = IO.concurrent()
    val ctx: CoroutineContext = IODispatchers.CommonPool

    return cf
        .run {
            tg.run {
                Semaphore(limit, cf).flatMap { semaphore ->
                    this@parTraverseN.parTraverse(ctx, tg) { a ->
                        semaphore.withPermit(f(a))
                    }
                }
            }
        }
        .map { list -> list.fix() }
        .fix()
}
Can I write this like this now, this also works but I am not sure both are equivalent :
Copy code
fun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
    val tg: Traverse<ForListK> = ListK.traverse()
    val cf: Concurrent<ForIO> = IO.concurrent()
    val ctx: CoroutineContext = IODispatchers.CommonPool
    val semaphore: Kind<ForIO, Semaphore<ForIO>> = Semaphore.invoke(limit, cf)

    return semaphore
        .flatMap { s ->
            this@parTraverseN.parTraverse(ctx, tg) { a ->
                <http://LOG.info|LOG.info>("running element....")
                s.withPermit(f(a))
            }
        }
        .map { list -> list.fix() }
}
Also when I do this :
Copy code
fun main() {
    val someList: ListK<Int> = (1..100000).toList().k()

    unsafe {
        runBlocking {
            IO.fx {
                someList
                    .parTraverseN(10) { element -> printlnElement(element) }
                    .bind()
            }
        }
    }
}

fun printlnElement(element: Int): IO<Unit> {
    return IO { println(element) }
}
It runs out of memory. I am giving it 4G heap space. with both implementations. Also its if I put a log line before
s.withPermit(f(a))
for example saying running element..... Then for all the elements running element.... prints and then the actual elements. Which seems expected as
this@parTraverseN.parTraverse(ctx, tg) { ... }
will put the block to execute sequentially and then each block gets executed with semaphore and in
CoroutineContext
. So this behaviour is expected. Is it true ?
s
Hi, no problem. It’s been a while so my memory is a big foggy 😄 if you want to log to also be printed with the
acquired
permit from the Semaphore you can do so by chaining the log and taks in a single effect before passing it to the semaphore.
Copy code
this@parTraverseN.parTraverse(ctx, tg) { a ->
                <http://LOG.info|LOG.info>("running element....")
                s.withPermit(f(a))
            }
Copy code
s.withPermit(IO.effect { <http://LOG.info|LOG.info>("running element....") }.followedby(f(a)))
Now you should see the logs running in batches.
What happens with a semaphore is that it semantically blocks until it gets a permit, then it runs the task and releases the permit. This means that you’ll see all
LOG
calls first, because those get executed before the semantic blocking of trying to acquire a permit starts.
So both versions you wrote above are correct since they’re equivalent 👍
s
Awesome. Can I make the second implementation anymore shorter ?
Copy code
fun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
    val tg: Traverse<ForListK> = ListK.traverse()
    val cf: Concurrent<ForIO> = IO.concurrent()
    val ctx: CoroutineContext = IODispatchers.CommonPool
    val semaphore: Kind<ForIO, Semaphore<ForIO>> = Semaphore.invoke(limit, cf)
    return semaphore
        .flatMap { s ->
            this@parTraverseN.parTraverse(ctx, tg) { a ->               
                s.withPermit(f(a))
            }
        }
        .map { list -> list.fix() }
}
Also Can you help with the example of 100000 elements I wrote above, why it runs out of heap space ?
s
Well
parTraverse
exists for
Iterable
in
IO.concurrent()
So you can simply do.
Copy code
fun <A, B> Iterable<A>.parTraverseN(limit: Long, f: (A) -> IOOf<B>): IO<List<B>> = IO.concurrent().run {
  Semaphore(limit).flatMap { s -> parTraverse { a -> s.withPermit(f(a)) } }
}
This was written from my phone but it should be close 😄
s
yes. This is much simpler. Amazing. Also I don’t have to transform to ListK with this.
Awesome. I’ll give it a spin and see if I get OOM with this too or not
Thank you 🙂
Yes. This example still runs out of heap space and I don’t understand how 4GB is not enough. Can you help me understand please. 🙂 :
Copy code
fun main() {
    val someList: List<Int> = (1..100000).toList()

    unsafe {
        runBlocking {
            IO.fx {
                someList
                    .parTraverseN(10) { element -> printlnElement(element) }
                    .bind()
            }
        }
    }
}

fun printlnElement(element: Int): IO<Unit> {
    return IO { println(element) }
}