Satyam Agarwal
09/24/2019, 8:29 AMSatyam Agarwal
09/24/2019, 8:30 AMsimon.vergauwen
09/24/2019, 8:31 AMObservable
, Flowable
& Flux
). It’s safe to use with all other types, I’d keep the impl you have for now.simon.vergauwen
09/24/2019, 8:31 AMparTraverse
and Semaphore
utilities in Arrow Fx.Satyam Agarwal
09/24/2019, 8:32 AMSatyam Agarwal
12/16/2019, 6:11 PMfun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
val tg: Traverse<ForListK> = ListK.traverse()
val cf: Concurrent<ForIO> = IO.concurrent()
val ctx: CoroutineContext = IODispatchers.CommonPool
return cf
.run {
tg.run {
Semaphore(limit, cf).flatMap { semaphore ->
this@parTraverseN.parTraverse(ctx, tg) { a ->
semaphore.withPermit(f(a))
}
}
}
}
.map { list -> list.fix() }
.fix()
}
Can I write this like this now, this also works but I am not sure both are equivalent :
fun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
val tg: Traverse<ForListK> = ListK.traverse()
val cf: Concurrent<ForIO> = IO.concurrent()
val ctx: CoroutineContext = IODispatchers.CommonPool
val semaphore: Kind<ForIO, Semaphore<ForIO>> = Semaphore.invoke(limit, cf)
return semaphore
.flatMap { s ->
this@parTraverseN.parTraverse(ctx, tg) { a ->
<http://LOG.info|LOG.info>("running element....")
s.withPermit(f(a))
}
}
.map { list -> list.fix() }
}
Also when I do this :
fun main() {
val someList: ListK<Int> = (1..100000).toList().k()
unsafe {
runBlocking {
IO.fx {
someList
.parTraverseN(10) { element -> printlnElement(element) }
.bind()
}
}
}
}
fun printlnElement(element: Int): IO<Unit> {
return IO { println(element) }
}
It runs out of memory. I am giving it 4G heap space.
with both implementations.
Also its if I put a log line before s.withPermit(f(a))
for example saying running element..... Then for all the elements running element.... prints and then the actual elements.
Which seems expected as this@parTraverseN.parTraverse(ctx, tg) { ... }
will put the block to execute sequentially and then
each block gets executed with semaphore and in CoroutineContext
. So this behaviour is expected. Is it true ?simon.vergauwen
12/16/2019, 6:16 PMacquired
permit from the Semaphore you can do so by chaining the log and taks in a single effect before passing it to the semaphore.
this@parTraverseN.parTraverse(ctx, tg) { a ->
<http://LOG.info|LOG.info>("running element....")
s.withPermit(f(a))
}
s.withPermit(IO.effect { <http://LOG.info|LOG.info>("running element....") }.followedby(f(a)))
Now you should see the logs running in batches.simon.vergauwen
12/16/2019, 6:17 PMLOG
calls first, because those get executed before the semantic blocking of trying to acquire a permit starts.simon.vergauwen
12/16/2019, 6:18 PMSatyam Agarwal
12/16/2019, 7:00 PMfun <A, B> Kind<ForListK, A>.parTraverseN(limit: Long, f: (A) -> Kind<ForIO, B>): IO<ListK<B>> {
val tg: Traverse<ForListK> = ListK.traverse()
val cf: Concurrent<ForIO> = IO.concurrent()
val ctx: CoroutineContext = IODispatchers.CommonPool
val semaphore: Kind<ForIO, Semaphore<ForIO>> = Semaphore.invoke(limit, cf)
return semaphore
.flatMap { s ->
this@parTraverseN.parTraverse(ctx, tg) { a ->
s.withPermit(f(a))
}
}
.map { list -> list.fix() }
}
Also Can you help with the example of 100000 elements I wrote above, why it runs out of heap space ?simon.vergauwen
12/16/2019, 7:01 PMparTraverse
exists for Iterable
in IO.concurrent()
simon.vergauwen
12/16/2019, 7:04 PMfun <A, B> Iterable<A>.parTraverseN(limit: Long, f: (A) -> IOOf<B>): IO<List<B>> = IO.concurrent().run {
Semaphore(limit).flatMap { s -> parTraverse { a -> s.withPermit(f(a)) } }
}
simon.vergauwen
12/16/2019, 7:04 PMSatyam Agarwal
12/16/2019, 7:09 PMSatyam Agarwal
12/16/2019, 7:10 PMSatyam Agarwal
12/16/2019, 7:10 PMSatyam Agarwal
12/16/2019, 7:44 PMfun main() {
val someList: List<Int> = (1..100000).toList()
unsafe {
runBlocking {
IO.fx {
someList
.parTraverseN(10) { element -> printlnElement(element) }
.bind()
}
}
}
}
fun printlnElement(element: Int): IO<Unit> {
return IO { println(element) }
}