Moin Moin! :slightly_smiling_face: Sorry, but here...
# arrow
a
Moin Moin! 🙂 Sorry, but here is the next dumb question: Thanks to @Patrick Louis I now know how to traverse a SequenceK in parallel - parTraverse. Unfortunately I don't see a way to use a thread pool with a limited number of threads during the traverse. Also I'm not sure if parTraverse is the best way to go: https://arrow-kt.io/docs/0.10/apidocs/arrow-fx/arrow.fx.typeclasses/-concurrent/par-traverse.html Could please somebody point out what I'm missing here? Thanks! Best regards Alex
s
Hey, It might be better to use
Semaphore
instead to limit the maximum amount of parallel executions. In the Arrow Fx Coroutines libraries you could also turn a
ExecutorService
into a
Resource<CoroutineContext>
, and it also exposes
parTraverseN
to traverse in parallel but specify a maximum
n
parallel tasks. https://arrow-kt.io/docs/next/apidocs/arrow-fx-coroutines/arrow.fx.coroutines/kotlin.collections.-iterable/par-traverse-n.html This is all on
0.11.0-SNAPSHOT
which will be released soon but it's also back-portable to the current Arrow Fx library 🙂
Although it doesn't contain those methods out of the box.
a
Hello Simon Thank you for your answer. There is another thing that puzzles me: As far as I've read parTraverse uses a fold-operation. In my case the sequence is built from a seed-function on a paged result-set. I'm afraid that this fold-operation will collect all data thus rendering the pagination useless, but I need the pagination to minimize the memory-footprint. Best regards Alex
Mhm. Mhm mhm mhm. 🤔 Thinking about it I guess I would need something like a parMapN...
s
parMapN
is to combine a finite set of operations, where as
sequenceK
is probably an infinite one?
There is also
Stream
in Arrow Fx Coroutines for when you need to work with infinite
suspend
operations. It also has the ability to easily run things in parallel.
a
Yes, exactly: It would be a map but parallel in execution and limited to a set number of threads.
Uh-Oh. I don't know ho to deal with transactions when using coroutines. As far as I know spring stores transaction-information tread local. I have to check this... But first I'll take a look at Stream in Fx Coroutines. 🙂 Thanks!
b
I'd recommend using Spring's programmatic transaction management with something like a
Bracket
or
Resource
@Transactional
is probably doomed to failure
s
@Bob Glamm do you have any examples? I'd love to see them. I remember we discussed a gist some time ago.
b
The
db4k
stuff I wrote up had some of that using purely JDBC but that was really alpha-quality code. I haven't had a chance to connect Spring transactions to FP yet.
I've been more interested in some of the concepts made available via API with doobie/skunk, but I haven't had a chance to chase much of that down, either 😞
s
Yes, that'd be amazing!
r
@Bob Glamm I’m not 100% sure but the issue with spring transactions is that they are pinned to the thread so they can’t really survive async boundaries right? I believe when Loom is out this may change since the blocking JDBC model could be adjusted to do other more sophisticated policies. If you find a way let me know, this is something I tried years ago and didn’t work out as I wanted. Doobie seems to be currently the closest to a pure impl to talk to JDBC in Scala. Is there currently an approach in Spring to mix suspend functions and @Transactional?
b
Yes, you are right, even Spring's programmatic transaction management still binds transactions to threads
I think Connection has to be a Resource and Bracket could be used to obtain and close a transaction on a connection
None of the Spring facilities will work in a pure FP manner
well, outside of stuffing a call to a complete @Transactional block in IO
a
@Bob Glamm Yes, that's what think also. The transactions are always thread-local. I'm afraid this will cause me some trouble...
s
So the limitation is that it cannot be parallelised since it's thread-local? If that's the case then you cannot run that code on
parTraverse
but it will underlying go to
executor.submit { }
to schedule the tasks in parallel, and it'll return the result on the original context in the same way.
Unfortunately I don't see a way to use a thread pool with a limited number of threads during the traverse.
If you have an
Executor
with
n
pools that works with Spring. You could do the following.
Copy code
val springExecutor = TODO()
val ids = listOf(1, 2, 3, 4, 5)

Resource.fromExeuctor(springExecutor)
  .use { ctx ->
     ids.parTraverse(ctx) { id ->
       // `suspend` function
     }
  }
This will make sure that the
suspend
function is schedule on the
springExecutor
and that the exeutor is
shutdown
when it returns. That is the only safe way to provide a
Executor
->
CoroutineContext
primitive, I could help write another one for pools that don't have to be shut down like this but instead would be controlled by for example Spring itself.
I'm not familiar with these areas of Spring, but it'd be amazing to figure out some cool patterns like these 🙂
a
Oi, this looks good! Thanks! 🙂 By the way: We ran into a similar problem with work-stealing-threads in pure Java.
I tinkered around a little bit an came up with this:
Copy code
fun <X, Y> ((X) -> Y).toCallable(x: X): Callable<Y>
        = Callable { this.invoke(x) }

fun <X, Y> SequenceK<X>.parMapN(n: Int,
                                f: (X) -> Y):SequenceK<Y> {
            val i: Iterator<X> = this.iterator()
            return sequence {
                val executor: ThreadPoolExecutor = Executors.newFixedThreadPool(n) as ThreadPoolExecutor
                while (i.hasNext()) {
                    yield(executor.submit(f.toCallable(i.next())))
                }
                executor.shutdown()
            }.map {
                it.get()
            }.k()
}
@simon.vergauwen I'll take a closer look at your suggestion tomorrow. For the moment I just want to get back to my balcony and my cool beer. 😋 Even if I have to leave the air-conditioning in my office.
r
The issue comes down that async transactions if they are distributed they need a pattern like Saga or a transactor service
My experience with microservices is that they are not that useful and it's better to have libraries that can run distributed or in process stateless
a
I have to admit that I'm stuck. @simon.vergauwen's suggestion looked good but I'm too dumb to implement it... 🙁
s
Is the snippet I shared not working? Or is it because you're using it over
Sequence
? The snippet you shared doesn't seem to rely on
suspend
but rather
Executor
and
Future#get
.
Don't worry, concurrency is very hard! I've ran into the wall a thousand times myself!
a
Sniffle 😢 Okay, I'll take another try at this...
Moin @simon.vergauwen 🙂 I came up with an idea but I don't know if it's a sound one: I was thinking about putting the data I want to process in parallel into some kind of execution-context also holding the executor-service which provides the thread-pool. Then I could use this execution-context as a resource I thought... But I'm really not sure if this will work out. Best regards Alex
a
P.S. I built Terpal-SQL which is effectively a doobie-for-kotlin earlier last year. You can find it here: https://github.com/ExoQuery/terpal-sql It supports SQL-injection-safe
Sql("$dollar_sign_interpolation")
automatically injecting these into JDBC prepared statements. Also it supports Kotlin Multiplatform Android, IOS, and Native targets and uses co-routines in an idiomatic way. (also have a look at the #C07CVTQ6WBG channel)