Hey all, I wanted to investigate parallel map and ...
# coroutines
j
Hey all, I wanted to investigate parallel map and filter functions in kotlin which lead me to this page. https://jivimberg.io/blog/2018/05/04/parallel-map-in-kotlin/ After implementing the pMap as described in the article, I found that the performance is far worse(37X slower) than the standard
map
function in kotlin. If I contrast this to Javas
parallel
stream, the java stream is far faster than the standard kotlin
map
function.
Copy code
suspend fun <I, O> Iterable<I>.pMap(f: suspend (I) -> O): List<O> = coroutineScope {
    map {
        async { f(it) }
    }.awaitAll()
}
val aP = (1..1000000).toList()

@OptIn(ExperimentalTime::class)
suspend fun main(args: Array<String>) {

    val timeP = measureTime {
        aP.parallelStream().map { it.plus(6) }
        aP.parallelStream().filter { it == 800000 }
    }
    println(timeP)

    val time = measureTime {
        aP.map { it.plus(6) }
        aP.filter { it == 800000 }
    }
    println(time)

}
What’s going on here? my expectation would have been that the
pMap
function to be on par with java’s
parallelStream
.
t
as the article warns, you are not using any Dispatcher
add one (IO in this case) and my result is
Copy code
12.932808ms
1.844329ms
as you see it’s still slower though
more threads doesn’t always equal faster. if like the article you simulate some work, it will be: https://pl.kotl.in/N-8fLDHtq
Copy code
3.750947ms
1.015118225s
j
Ah, nice. Totally missed that part. Thanks for correcting that for me. btw my results locally are
Copy code
3.796603ms
1.271090ms
This means that the coroutines are actually faster.
t
if you are using playground it’s actually executed remotely, on a probably pretty contrained environment, so it’s probably pretty variable. if you run on your local machine you could put in some actual work rather than just
sleep
to see the difference
be aware that this is a poor benchmark though (no warmup or anything)
j
ah ok, don’t the measure time blocks solve that? I wanted to make sure there wasn’t any type caching of some sorts in play. But would you say that this would give some what of estimate of the magnitude of performance?
one gripe I do have with the
withContext
block is that it’s not as clean as just chaining
parallelStream
. Would there be a way to wrap the
pMap
function in a context?
would this work?
Copy code
suspend fun <I, O> Iterable<I>.pMap(f: suspend (I) -> O): List<O> =
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        coroutineScope {
            map {
                async { f(it) }
            }.awaitAll()
        }
    }
b
remove
coroutineScope
builder,
withContext(io)
is enough to wrap it into context
1
t
uhm, looking a little closer I’m not sure any of the example makes sense to be honest, you define
pMap
but never use it. (and there is no pFilter)
and your parallel streams are not collected so they don’t do anything 😁
1
😂 1
https://pl.kotl.in/3G4G-hrKl reduced to just map, and actually using parallel streams with some some ugly copy pasting
j
oh woops. Yeah my bad for not linking the correct code. In order to make a cleaner API, would the IO dispatcher be a good dispatcher for all map operations?
ie
Copy code
suspend fun <I, O> Iterable<I>.pcMap(f: suspend (I) -> O): List<O> =
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) { // has many threads
        map {
            async { f(it) }
        }.awaitAll()
    }
b
No, IO is designed for blocking operations (that do not consume much cpu resources), it is typically backed by much more threads than processors count. If you run CPU-expensive calls in that map, these threads will start starving. For these types of operation, Default is better But at the same time, Default is not the best option for ALL map operations. E.g. you need to load 500 items from the network. If you will run it on the Default dispatcher 1. it will take more time than on IO, 2. it will block Default from doing CPU-consuming tasks from other pieces of code. So it depends on your usecase, and probably you need to add a dispatcher as a parameter (probably with default to Default)