https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

Stephan Schroeder

11/04/2019, 11:17 AM
So I was thinking about parallel mapping. So basically having an extension function like this:
Copy code
fun <T, R> Iterable<T>.pmap(
    exec: ExecutorService,
    transform: (T) -> R
): List<R>
which uses an instance of ExecutorService to do all the transformations in parallel. An implementation of this is given e.g. here: https://stackoverflow.com/questions/34697828/parallel-operations-on-kotlin-collections/35638609#35638609 The problem is that I want to continue to work on the Rs as soon as they’re done, so List<R> is the wrong collection class (because a list will not be returned until all transformations are done). First I was thinking Sequence<R> would be the anwser for this. But now I think it isn’t. Firstly I could even make it work because I can’t simply modify to code to return a Sequence<R):
Copy code
= sequence{
    ....
    for (item in this) {
        exec.submit { yield transform(item) }
    }
    ....
}
is actually a compile-time error because I can’t call yield from a “sub-lambda”). But also a Sequence is supposed to be lazy, so you don’t start computing the next value until it’s requested. So what is the correct collection-like class?? Is this what coroutines’
Flow
is for? I guess I could also use a Java
Stream
, but that just seams so unidiomatic. UPDATE: So I tried the flow-builder but it runs into the same problem as with Sequence:
Copy code
fun <T, R> pFlowMap(
    items: Iterable<T>,
    exec: ExecutorService,
    transform: (T) -> R): Flow<R> = flow {

    for (item in items) {
        exec.submit { emit(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)
}
The
emit
is flagged with “Suspension functions can be called only within coroutine body”. So I tried wrapping that into a `runBlocking`:
Copy code
runBlocking {
        for (item in items) {
            exec.submit { emit(transform(item)) }
        }
    }
but that dosn’t change a thing.
d

Dennis Schröder

11/04/2019, 12:00 PM
☝🏼 3
d

Dominaezzz

11/04/2019, 12:05 PM
Wrong builder. Use
channelFlow
for stuff like this.
s

Stephan Schroeder

11/04/2019, 1:00 PM
channelFlow
with
send
runs into the same problem of
send
being flagged with “Suspension functions can be called only within coroutine body”. Playground link: https://pl.kotl.in/rKJJDKpDP
d

Dominaezzz

11/04/2019, 1:26 PM
offer
or
sendBlocking
parallelMap
should work for you.
s

Stephan Schroeder

11/04/2019, 1:53 PM
looks good 😃 Now if i dont want to care about the creation of the ExecutorService myself, i guess this is the way to go? The inner launches will be executed on the ThreadPool of the outer launch!?
Copy code
= channelFlow {
     launch(newFixedThreadPoolContext(numberOfThreads, "tp1")) {
        items.forEach {item ->
            launch { this@channelFlow.send(transform(item)) }
        }
}
yes, that works 😃 (I added a 5s delay in the inner launch, and it definitely took only 5s and not 15!)
g

gildor

11/04/2019, 2:41 PM
Outer launch is not needed
Also careful with creating own pools, if you so not close them it cause leak of threads
Probably IO or Default dispatcher is enough, but depends on case, but IO and Default share threads
s

Stephan Schroeder

11/05/2019, 4:00 PM
I’m writting a chess program, I want control over my pools. I guess I’ll just have to handle it properly then 😎 The latest version looks more like this:
Copy code
inline fun <T, R> Iterable<T>.parallelTransform(
    numberOfThreads: Int,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
        launch( dispatcher ) {
            items.forEach { item ->
                launch {
                    channelFlowScope.send(transform(item))
                }
            }
        }
    }
}
g

gildor

11/06/2019, 12:40 PM
This is not very efficient, on every launch of this function you create a thread pool, it creates threads and then you shutdown everything. Also it's hard to control resources, what if someone launch too many of those parallel transform? If you do not want to use standard dispatchers (which also provide API to configure them) I would recommend to create own and reuse it across you app
3 Views