https://kotlinlang.org logo
#coroutines
Title
# coroutines
v

Viktor Qvarfordt

05/16/2019, 9:00 PM
What’s the easiest way of achieving parallelization for something like
myList.map { myCpuIntensiveTask(it) }
?
e

elizarov

05/16/2019, 9:03 PM
Copy code
myList.parallelStream().map { myCpuIntensiveTask(it) }.collect(Collectors.toList())
But we plan to provide simple, declaration and expressive API to do some like that with coroutines in the future, too.
👍 1
v

Viktor Qvarfordt

05/16/2019, 9:06 PM
Cool. Are the plans in some issue tracker? I’d love to follow the discussion
p

pakoito

05/16/2019, 9:10 PM
@elizarov will that be the future design of kx.coroutines, more declarative APIs like the ones in other libs? Fewer imperative-like code
I’m happy if that’s the case 😄
@pakoito We'd like to keep it as Kotlin in style as possible and are using the design of Kotlin sequences as an inspiration a lot.
p

pakoito

05/16/2019, 9:16 PM
so basing it off iterables with parallel properties, like in Java 8. Neat!
e

elizarov

05/16/2019, 9:17 PM
No. Completely unlike Java 8. Much neater.
👍 1
Java 8 stream is hot and single use, which gives you less-than-desired developer ergonomics.
☝🏼 1
We'll be basing it on cold flows, which gives a number of additional goodies on top of what streams can provide
v

Viktor Qvarfordt

05/16/2019, 9:19 PM
What is the timeline for this?
e

elizarov

05/16/2019, 9:19 PM
Flow ~= asynchronous Sequence, so that's why we use sequences API as an inspiration a lot
Months away. We are focused on core flow API at the moment (no parallelism)
👍 2
p

pakoito

05/16/2019, 9:22 PM
hot (hate that terminology, it’s eager and lazy) and single use is what deferred and job are by default anyway
which is why we had issues implementing several patterns for safe resource management. Happy that the library is evolving away from that
multi-shot continuations next? 😄 we found an encoding that doesn’t need the reflection tricks for suspended effects, but it’d be nice to prevent the extra allocations
e

elizarov

05/16/2019, 9:34 PM
multi-shot continuations will need separation from immutable and mutable state on a language level which is an extremely hard thing to design properly in a language that supports mutable state (as in "there is no existing science that gives clue to a pragmatic answer"), so no, they are not coming in a near future.
😱 2
😢 2
v

Viktor Qvarfordt

05/16/2019, 9:34 PM
Is this a decent approach?
Copy code
myList.map {
    async { myCpuIntensiveTask(it) }
}.map {
    it.await()
}
e

elizarov

05/16/2019, 9:36 PM
.map { it.await() }
==
.awaitAll()
. Otherwise, it is Ok, but there is no limit on concurrency in this solution, with potentially disastrous consequences at scale.
v

Viktor Qvarfordt

05/16/2019, 9:37 PM
What are the potentially disastrous consequences?
e

elizarov

05/16/2019, 9:37 PM
In case of "cpu intensive task" -- Running out of memory
v

Viktor Qvarfordt

05/16/2019, 9:38 PM
This should be parallel given that I’m using Dispatchers.Default. It is creating more coroutines than CPU cores, but they are still multiplexed onto num CPU cores number of threads, right?
e

elizarov

05/16/2019, 9:38 PM
Yes.
v

Viktor Qvarfordt

05/16/2019, 9:38 PM
Ah, but you mean I risk run out of memory because I’m creating lots of coroutines, or do you mean there is risk also without coroutines?
e

elizarov

05/16/2019, 9:39 PM
Yes. You are creating tons of coroutines upfront/
v

Viktor Qvarfordt

05/16/2019, 9:40 PM
So a better implementation would distribute the task across fewer coroutines? And the specific memory management might depend on the type of task (eg. a few heavy ones or billions of light ones). Is that the core point?
e

elizarov

05/16/2019, 9:44 PM
Yes. A better implementation would limit concurrency (a number of created coroutines), so for CPU intensive task you could set it to the number CPU cores, but for other tasks (like network intensive) you can use other consideration (such as scalability of downstream services you are touching)
v

Viktor Qvarfordt

05/16/2019, 9:46 PM
I see now 🙂 Thank you @elizarov for giving such clear answers. Really appreciated!
For anyone following this. I ended up creating this
Copy code
suspend fun <T, R> Iterable<T>.parallelMap(transform: (T) -> R): List<R> = withContext(Dispatchers.Default) {
    map { async { transform(it) } }.awaitAll()
}
12 Views