Stephan Schroeder
11/04/2019, 11:17 AMfun <T, R> Iterable<T>.pmap(
exec: ExecutorService,
transform: (T) -> R
): List<R>
which uses an instance of ExecutorService to do all the transformations in parallel. An implementation of this is given e.g. here: https://stackoverflow.com/questions/34697828/parallel-operations-on-kotlin-collections/35638609#35638609
The problem is that I want to continue to work on the Rs as soon as they’re done, so List<R> is the wrong collection class (because a list will not be returned until all transformations are done). First I was thinking Sequence<R> would be the anwser for this. But now I think it isn’t. Firstly I could even make it work because I can’t simply modify to code to return a Sequence<R):
= sequence{
....
for (item in this) {
exec.submit { yield transform(item) }
}
....
}
is actually a compile-time error because I can’t call yield from a “sub-lambda”). But also a Sequence is supposed to be lazy, so you don’t start computing the next value until it’s requested. So what is the correct collection-like class??
Is this what coroutines’ Flow
is for? I guess I could also use a Java Stream
, but that just seams so unidiomatic.
UPDATE:
So I tried the flow-builder but it runs into the same problem as with Sequence:
fun <T, R> pFlowMap(
items: Iterable<T>,
exec: ExecutorService,
transform: (T) -> R): Flow<R> = flow {
for (item in items) {
exec.submit { emit(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
}
The emit
is flagged with “Suspension functions can be called only within coroutine body”. So I tried wrapping that into a `runBlocking`:
runBlocking {
for (item in items) {
exec.submit { emit(transform(item)) }
}
}
but that dosn’t change a thing.Dennis Schröder
11/04/2019, 12:00 PMDominaezzz
11/04/2019, 12:05 PMchannelFlow
for stuff like this.Stephan Schroeder
11/04/2019, 1:00 PMchannelFlow
with send
runs into the same problem of send
being flagged with “Suspension functions can be called only within coroutine body”.
Playground link: https://pl.kotl.in/rKJJDKpDPDominaezzz
11/04/2019, 1:26 PMoffer
or sendBlocking
parallelMap
should work for you.Stephan Schroeder
11/04/2019, 1:53 PM= channelFlow {
launch(newFixedThreadPoolContext(numberOfThreads, "tp1")) {
items.forEach {item ->
launch { this@channelFlow.send(transform(item)) }
}
}
gildor
11/04/2019, 2:41 PMStephan Schroeder
11/05/2019, 4:00 PMinline fun <T, R> Iterable<T>.parallelTransform(
numberOfThreads: Int,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
launch( dispatcher ) {
items.forEach { item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
}
gildor
11/06/2019, 12:40 PM