Hey guys. I am here again with another gist of aro...
# arrow
s
Hey guys. I am here again with another gist of around 50 lines, and along with it 7 questions. Again I am really sorry I am doing like this, but I thought 1. I don’t want to spam the channel, and 2. Gist can found easily than scrolling slack conversation. I have tried `fx`’s
parTraverse
and it works after spending my whole Sunday to make it work, but I can’t grasp why it works. Thus the questions. Can you help me understand it please. Here is the gist : https://gist.github.com/satyamagarwal/336f02954fa4dc369fdea6524f593d1a
👍 3
s
I gave it a quick review. Please feel free to ask follow up questions if my answers aren’t clear 🙂
s
@simon.vergauwen @pakoito just want to tell you guys, you are amazing. Thanks a lot for making time.
❤️ 1
I tried implementing you suggestions. I wasn’t able to make it work as I am clearly missing parts. I’ve commented my implementation along with 4 questions as comment in the same gist. Could you please review and help me 🙂 ? https://gist.github.com/satyamagarwal/336f02954fa4dc369fdea6524f593d1a#gistcomment-2984364 Thank you so much. 🙂
Hey Paco. Thanks a lot for the illustration and the explanation. Please bare with me
Copy code
val result: IO<List<List<Profile>>> = 
            users.sortedBy { user -> user.id }
                    .chunked(1000) // it's all sync in <http://Dispatchers.IO|Dispatchers.IO> up to here
                    .map(usersLambda) // create the IOs to execute sequentially with the lambda above
                    .sequence(IO.applicative()) // List<IO<List<Profile>>> -> IO<List<List<Profile>>>
Gives me
Kind<ForIO, Kind<ForListK, List<Profile>>>
and
.fix()
is not resolving it to
IO<List<List<Profile>>>
Apologies, I haven’t quite understood how to work with such operations yet.
ok. I was able to fix it by doing
Copy code
val result: IO<ListK<List<Profile>>> = users
                .sortedBy { user -> user.id }
                .chunked(1000)
                .map(usersLambda)
                .sequence(IO.applicative())
                .map { list -> list.fix() }
                .fix()
But now only first chuck gets processed, and then it just sits.
p
pause the debugger and check what happens
or add a couple of print statements
I suspect Dispatchers.IO isn’t giving you 2 threads to work on, so it deadlocks
s
Debugger didn’t give me much, but print statements did. I tried both by keeping and removing
<http://Dispatchers.IO|Dispatchers.IO>
. I’ve written the observations after each print statement. You suspicion seems to be correct (not just for Dispatchers.IO though). Would you help more 🙂 ? Thanks a lot.
p
check that
insert
finishes correctly
also
change it to
unsafeRunAsync
I suspect it can’t come back to the original thread because you’re blocking it
just to test it at least
s
.fix().unsafeRunAsync { }
doesn’t blocks. I tried wrapping it in
unsafe { runBlocking { .. } }
, that didn’t work as well. Inserts works for 1st 1000 objects, so does bracket. 1st 1000 connections get closed properly, and then it just sits.
i took the heap dump and analyzed it. It says stacktrace for leak suspect as :
Copy code
main
  at jdk.internal.misc.Unsafe.park(ZJ)V (Native Method)
  at java.util.concurrent.locks.LockSupport.park(Ljava/lang/Object;)V (LockSupport.java:194)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt()Z (AbstractQueuedSynchronizer.java:885)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(I)V (AbstractQueuedSynchronizer.java:1039)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(I)V (AbstractQueuedSynchronizer.java:1345)
  at arrow.fx.internal.Platform.unsafeResync(Larrow/fx/IO;Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (Utils.kt:162)
  at <http://arrow.fx.IO|arrow.fx.IO>$Async.unsafeRunTimedTotal$arrow_fx(Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (IO.kt:336)
  at arrow.fx.IO.unsafeRunTimed(Larrow/fx/typeclasses/Duration;)Larrow/core/Option; (IO.kt:288)
  at arrow.fx.IO.unsafeRunSync()Ljava/lang/Object; (IO.kt:285)
  at ImporterApplicationKt.doImport(LImporterApplication;)Ljava/util/List; (ImporterApplication.kt:86)
  at ImporterApplicationKt.main([Ljava/lang/String;)V (ImporterApplication.kt:50)
I can’t send you heap dump because of GDPR unfortunately.
I also did a run with
<http://Dispatchers.IO|Dispatchers.IO>
with 1000 users and chunk of 10 objects. Then it works very nicely. Almost all the chunks get different dispatcher thread, and each object processing also gets random dispatcher thread.
s
Hmm. Would controlling the amount of parallel processes or providing a context with a limited amount of concurrent processes be a solution?
I know Kotlinx dispatchers contain kotlinx specific logic so I tend to avoid them outside of kotlinx.
s
Definitely, I can live with that, but then I’d rather ditch
<http://Dispatchers.IO|Dispatchers.IO>
and just use Arrow’s Dispatcher. Problem is, with huge amount, that chokes as well. 😞
p
problem is on the dispatcher then, it's not giving you the parallelism you want
use Arrow's, we have a bound threadpool by default
s
I did few more tests. The chunk with 1000 was choking on both Arrow’s and Kotlin’s IO dispatchers. I changed it 100, and tweaked my database max pool connections. Now it runs on both Arrow’s and Kotlin’s IO dispatchers. Now it inserts 100000 objects in 40 seconds. Amazing stuff you have build guys 🙂 Last two questions : 1. I am still not able to run my
IO.fx { … }
with
IO.fx { … }.fix()..unsafeRunAsync { }
. It exits immediately. Its not blocking. Can you help me with how can I achieve it ? 2. Could you explain me the flags that went off in your head to exactly suspect the problems. I am still missing on that part? And thanks a lot for instant replies and so much help. Thanks a lot 🙂
s
1. If it needs to be blocking than you can only use
unsafeRunSync
, or
flatMap
to compose it sequentially into another program. The difference is that
unsafeRunSync
internally uses a latch to block the original thread until the result is received, and
unsafeRunAsync
doesn’t. So on a context (thread) switch,
unsafeRunAsync
will release the original (previous) context and thus the caller thread will continue while
unsafeRunSync
keeps blocking the callers thread using the latch. 2. The flag triggered in my head was that running a list with n times 1000 chunked ops on an
IO
pool would start n times 1000 threads. Which ain’t good, and I’ve seen similar behavior because of it. For that reason I implemented another solution called
parTraverseN
which limited the concurrent processes using a
Semaphore
. That would solve that problem here without having to tweak database settings (generic solution). It’s not included yet because it was flaky with
Observable
and
Flowable
because of concurrent streaming. I can help you with that if you’re interested.
s
ok. Yes please. I’d love that. I have actually few other use cases in my project which don’t deal with database, but rather network requests, where each http connection would work in milliseconds than microseconds unlike database. So this would certainly help me alot.
s
Copy code
fun <G, A, B> Kind<G, A>.parTraverseN(TG: Traverse<G>, CF: Concurrent<F>, n: Long, ctx: CoroutineContext = CF.dispatchers().default(), f: (A) -> Kind<F, B>): Kind<F, Kind<G, B>> = CF.run { TG.run {
    Semaphore(n).flatMap { semaphore ->
        this@parTraverseN.parTraverse(TG, ctx) { a ->
          semaphore.withPermit(f(a))
        }
    }
  } }
Excuse the formatting 😜
s
Hey Simon. Here is my attempt. It of course doesn’t work, because obviously I need to do quite a bit reading to understand what you did, and how it works. But this as is, seems to work, but never inserts in the database. Nor do I see Fork-Join-Pool threads.
s
I can look into it later for you 🙂
❤️ 1