Satyam Agarwal
07/29/2019, 4:30 PMparTraverse
and it works after spending my whole Sunday to make it work, but I can’t grasp why it works. Thus the questions. Can you help me understand it please. Here is the gist :
https://gist.github.com/satyamagarwal/336f02954fa4dc369fdea6524f593d1asimon.vergauwen
07/29/2019, 5:05 PMSatyam Agarwal
07/29/2019, 5:20 PMSatyam Agarwal
07/29/2019, 11:31 PMSatyam Agarwal
07/30/2019, 12:33 AMval result: IO<List<List<Profile>>> =
users.sortedBy { user -> user.id }
.chunked(1000) // it's all sync in <http://Dispatchers.IO|Dispatchers.IO> up to here
.map(usersLambda) // create the IOs to execute sequentially with the lambda above
.sequence(IO.applicative()) // List<IO<List<Profile>>> -> IO<List<List<Profile>>>
Gives me Kind<ForIO, Kind<ForListK, List<Profile>>>
and .fix()
is not resolving it to IO<List<List<Profile>>>
Satyam Agarwal
07/30/2019, 12:35 AMSatyam Agarwal
07/30/2019, 1:07 AMval result: IO<ListK<List<Profile>>> = users
.sortedBy { user -> user.id }
.chunked(1000)
.map(usersLambda)
.sequence(IO.applicative())
.map { list -> list.fix() }
.fix()
But now only first chuck gets processed, and then it just sits.pakoito
07/30/2019, 9:00 AMpakoito
07/30/2019, 9:14 AMpakoito
07/30/2019, 9:15 AMSatyam Agarwal
07/30/2019, 9:55 AM<http://Dispatchers.IO|Dispatchers.IO>
.
I’ve written the observations after each print statement. You suspicion seems to be correct (not just for Dispatchers.IO though). Would you help more 🙂 ? Thanks a lot.pakoito
07/30/2019, 10:06 AMinsert
finishes correctlypakoito
07/30/2019, 10:07 AMpakoito
07/30/2019, 10:07 AMunsafeRunAsync
pakoito
07/30/2019, 10:07 AMpakoito
07/30/2019, 10:09 AMSatyam Agarwal
07/30/2019, 10:33 AM.fix().unsafeRunAsync { }
doesn’t blocks. I tried wrapping it in unsafe { runBlocking { .. } }
, that didn’t work as well.
Inserts works for 1st 1000 objects, so does bracket. 1st 1000 connections get closed properly, and then it just sits.Satyam Agarwal
07/30/2019, 10:37 AMmain
at jdk.internal.misc.Unsafe.park(ZJ)V (Native Method)
at java.util.concurrent.locks.LockSupport.park(Ljava/lang/Object;)V (LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt()Z (AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(I)V (AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(I)V (AbstractQueuedSynchronizer.java:1345)
at arrow.fx.internal.Platform.unsafeResync(Larrow/fx/IO;Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (Utils.kt:162)
at <http://arrow.fx.IO|arrow.fx.IO>$Async.unsafeRunTimedTotal$arrow_fx(Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (IO.kt:336)
at arrow.fx.IO.unsafeRunTimed(Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (IO.kt:288)
at arrow.fx.IO.unsafeRunSync()Ljava/lang/Object; (IO.kt:285)
at ImporterApplicationKt.doImport(LImporterApplication;)Ljava/util/List; (ImporterApplication.kt:86)
at ImporterApplicationKt.main([Ljava/lang/String;)V (ImporterApplication.kt:50)
I can’t send you heap dump because of GDPR unfortunately.Satyam Agarwal
07/30/2019, 10:54 AM<http://Dispatchers.IO|Dispatchers.IO>
with 1000 users and chunk of 10 objects. Then it works very nicely. Almost all the chunks get different dispatcher thread, and each object processing also gets random dispatcher thread.simon.vergauwen
07/30/2019, 10:56 AMsimon.vergauwen
07/30/2019, 10:57 AMSatyam Agarwal
07/30/2019, 10:59 AM<http://Dispatchers.IO|Dispatchers.IO>
and just use Arrow’s Dispatcher. Problem is, with huge amount, that chokes as well. 😞pakoito
07/30/2019, 11:05 AMpakoito
07/30/2019, 11:05 AMSatyam Agarwal
07/30/2019, 11:55 AMIO.fx { … }
with IO.fx { … }.fix()..unsafeRunAsync { }
. It exits immediately. Its not blocking. Can you help me with how can I achieve it ?
2. Could you explain me the flags that went off in your head to exactly suspect the problems. I am still missing on that part?
And thanks a lot for instant replies and so much help. Thanks a lot 🙂simon.vergauwen
07/30/2019, 12:15 PMunsafeRunSync
, or flatMap
to compose it sequentially into another program. The difference is that unsafeRunSync
internally uses a latch to block the original thread until the result is received, and unsafeRunAsync
doesn’t. So on a context (thread) switch, unsafeRunAsync
will release the original (previous) context and thus the caller thread will continue while unsafeRunSync
keeps blocking the callers thread using the latch.
2. The flag triggered in my head was that running a list with n times 1000 chunked ops on an IO
pool would start n times 1000 threads. Which ain’t good, and I’ve seen similar behavior because of it. For that reason I implemented another solution called parTraverseN
which limited the concurrent processes using a Semaphore
. That would solve that problem here without having to tweak database settings (generic solution). It’s not included yet because it was flaky with Observable
and Flowable
because of concurrent streaming. I can help you with that if you’re interested.Satyam Agarwal
07/30/2019, 12:19 PMsimon.vergauwen
07/30/2019, 12:20 PMfun <G, A, B> Kind<G, A>.parTraverseN(TG: Traverse<G>, CF: Concurrent<F>, n: Long, ctx: CoroutineContext = CF.dispatchers().default(), f: (A) -> Kind<F, B>): Kind<F, Kind<G, B>> = CF.run { TG.run {
Semaphore(n).flatMap { semaphore ->
this@parTraverseN.parTraverse(TG, ctx) { a ->
semaphore.withPermit(f(a))
}
}
} }
simon.vergauwen
07/30/2019, 12:23 PMSatyam Agarwal
07/30/2019, 1:21 PMsimon.vergauwen
07/30/2019, 1:29 PM