Alexander Schell
08/13/2020, 10:20 AMsimon.vergauwen
08/13/2020, 10:34 AMSemaphore
instead to limit the maximum amount of parallel executions.
In the Arrow Fx Coroutines libraries you could also turn a ExecutorService
into a Resource<CoroutineContext>
, and it also exposes parTraverseN
to traverse in parallel but specify a maximum n
parallel tasks.
https://arrow-kt.io/docs/next/apidocs/arrow-fx-coroutines/arrow.fx.coroutines/kotlin.collections.-iterable/par-traverse-n.html
This is all on 0.11.0-SNAPSHOT
which will be released soon but it's also back-portable to the current Arrow Fx library 🙂simon.vergauwen
08/13/2020, 10:34 AMAlexander Schell
08/13/2020, 10:58 AMAlexander Schell
08/13/2020, 11:06 AMsimon.vergauwen
08/13/2020, 11:57 AMparMapN
is to combine a finite set of operations, where as sequenceK
is probably an infinite one?simon.vergauwen
08/13/2020, 11:58 AMStream
in Arrow Fx Coroutines for when you need to work with infinite suspend
operations. It also has the ability to easily run things in parallel.Alexander Schell
08/13/2020, 1:45 PMAlexander Schell
08/13/2020, 1:48 PMBob Glamm
08/13/2020, 1:55 PMBracket
or Resource
Bob Glamm
08/13/2020, 1:55 PM@Transactional
is probably doomed to failuresimon.vergauwen
08/13/2020, 1:56 PMBob Glamm
08/13/2020, 1:57 PMdb4k
stuff I wrote up had some of that using purely JDBC but that was really alpha-quality code. I haven't had a chance to connect Spring transactions to FP yet.Bob Glamm
08/13/2020, 1:58 PMsimon.vergauwen
08/13/2020, 1:58 PMraulraja
08/14/2020, 7:30 PMBob Glamm
08/14/2020, 8:02 PMBob Glamm
08/14/2020, 8:04 PMBob Glamm
08/14/2020, 8:04 PMBob Glamm
08/14/2020, 8:05 PMAlexander Schell
08/16/2020, 9:13 AMsimon.vergauwen
08/16/2020, 11:05 AMparTraverse
but it will underlying go to executor.submit { }
to schedule the tasks in parallel, and it'll return the result on the original context in the same way.
Unfortunately I don't see a way to use a thread pool with a limited number of threads during the traverse.If you have an
Executor
with n
pools that works with Spring. You could do the following.
val springExecutor = TODO()
val ids = listOf(1, 2, 3, 4, 5)
Resource.fromExeuctor(springExecutor)
.use { ctx ->
ids.parTraverse(ctx) { id ->
// `suspend` function
}
}
This will make sure that the suspend
function is schedule on the springExecutor
and that the exeutor is shutdown
when it returns.
That is the only safe way to provide a Executor
-> CoroutineContext
primitive, I could help write another one for pools that don't have to be shut down like this but instead would be controlled by for example Spring itself.simon.vergauwen
08/16/2020, 11:07 AMAlexander Schell
08/16/2020, 11:09 AMAlexander Schell
08/16/2020, 12:29 PMfun <X, Y> ((X) -> Y).toCallable(x: X): Callable<Y>
= Callable { this.invoke(x) }
fun <X, Y> SequenceK<X>.parMapN(n: Int,
f: (X) -> Y):SequenceK<Y> {
val i: Iterator<X> = this.iterator()
return sequence {
val executor: ThreadPoolExecutor = Executors.newFixedThreadPool(n) as ThreadPoolExecutor
while (i.hasNext()) {
yield(executor.submit(f.toCallable(i.next())))
}
executor.shutdown()
}.map {
it.get()
}.k()
}
@simon.vergauwen I'll take a closer look at your suggestion tomorrow. For the moment I just want to get back to my balcony and my cool beer. 😋
Even if I have to leave the air-conditioning in my office.raulraja
08/16/2020, 1:56 PMraulraja
08/16/2020, 1:59 PMAlexander Schell
08/19/2020, 4:30 AMsimon.vergauwen
08/19/2020, 6:03 AMSequence
?
The snippet you shared doesn't seem to rely on suspend
but rather Executor
and Future#get
.simon.vergauwen
08/19/2020, 6:05 AMAlexander Schell
08/19/2020, 12:34 PMAlexander Schell
08/20/2020, 8:00 AMAlexander Ioffe
01/01/2025, 10:52 PMSql("$dollar_sign_interpolation")
automatically injecting these into JDBC prepared statements. Also it supports Kotlin Multiplatform Android, IOS, and Native targets and uses co-routines in an idiomatic way.
(also have a look at the #C07CVTQ6WBG channel)