What’s the easiest way of achieving parallelizatio...
# coroutines
v
What’s the easiest way of achieving parallelization for something like
myList.map { myCpuIntensiveTask(it) }
?
e
Copy code
myList.parallelStream().map { myCpuIntensiveTask(it) }.collect(Collectors.toList())
But we plan to provide simple, declaration and expressive API to do some like that with coroutines in the future, too.
👍 1
v
Cool. Are the plans in some issue tracker? I’d love to follow the discussion
p
@elizarov will that be the future design of kx.coroutines, more declarative APIs like the ones in other libs? Fewer imperative-like code
I’m happy if that’s the case 😄
@pakoito We'd like to keep it as Kotlin in style as possible and are using the design of Kotlin sequences as an inspiration a lot.
p
so basing it off iterables with parallel properties, like in Java 8. Neat!
e
No. Completely unlike Java 8. Much neater.
👍 1
Java 8 stream is hot and single use, which gives you less-than-desired developer ergonomics.
☝🏼 1
We'll be basing it on cold flows, which gives a number of additional goodies on top of what streams can provide
v
What is the timeline for this?
e
Flow ~= asynchronous Sequence, so that's why we use sequences API as an inspiration a lot
Months away. We are focused on core flow API at the moment (no parallelism)
👍 2
p
hot (hate that terminology, it’s eager and lazy) and single use is what deferred and job are by default anyway
which is why we had issues implementing several patterns for safe resource management. Happy that the library is evolving away from that
multi-shot continuations next? 😄 we found an encoding that doesn’t need the reflection tricks for suspended effects, but it’d be nice to prevent the extra allocations
e
multi-shot continuations will need separation from immutable and mutable state on a language level which is an extremely hard thing to design properly in a language that supports mutable state (as in "there is no existing science that gives clue to a pragmatic answer"), so no, they are not coming in a near future.
😱 2
😢 2
v
Is this a decent approach?
Copy code
myList.map {
    async { myCpuIntensiveTask(it) }
}.map {
    it.await()
}
e
.map { it.await() }
==
.awaitAll()
. Otherwise, it is Ok, but there is no limit on concurrency in this solution, with potentially disastrous consequences at scale.
v
What are the potentially disastrous consequences?
e
In case of "cpu intensive task" -- Running out of memory
v
This should be parallel given that I’m using Dispatchers.Default. It is creating more coroutines than CPU cores, but they are still multiplexed onto num CPU cores number of threads, right?
e
Yes.
v
Ah, but you mean I risk run out of memory because I’m creating lots of coroutines, or do you mean there is risk also without coroutines?
e
Yes. You are creating tons of coroutines upfront/
v
So a better implementation would distribute the task across fewer coroutines? And the specific memory management might depend on the type of task (eg. a few heavy ones or billions of light ones). Is that the core point?
e
Yes. A better implementation would limit concurrency (a number of created coroutines), so for CPU intensive task you could set it to the number CPU cores, but for other tasks (like network intensive) you can use other consideration (such as scalability of downstream services you are touching)
v
I see now 🙂 Thank you @elizarov for giving such clear answers. Really appreciated!
For anyone following this. I ended up creating this
Copy code
suspend fun <T, R> Iterable<T>.parallelMap(transform: (T) -> R): List<R> = withContext(Dispatchers.Default) {
    map { async { transform(it) } }.awaitAll()
}