https://kotlinlang.org logo
Title
p

poohbar

10/09/2018, 6:46 PM
hi, i want just a simple
parallelStream
equivalent in kotlin.. does this make sense?
runBlocking {
   myItems.map {
    GlobalScope.launch {
       // do something
    }
   }.forEach { it.join() }
}

// now do something with all items
p

poohbar

10/09/2018, 6:53 PM
lol wtf is this, why is it so complicated
n

nwh

10/09/2018, 7:01 PM
I think your solution works fine @poohbar, but of course
GlobalScope#launch
uses the default dispatcher (common pool)
e

enleur

10/09/2018, 7:01 PM
it depends on what you need. if you’re doing CPU work, just go with parallelStream
g

gildor

10/10/2018, 12:57 AM
Just use joinAll() instead of forEach { it.join() } If you run this in coroutine scope (like in your runBlocking case) no need even join, scope will wait for all child coroutines Worker pool pattern needs if you want to limit parallelism. Also depends on type of task (blocking/non blocking, io or cpu bound)
Also therr is feature request about worker pool support in kotlinx.coroutines https://github.com/Kotlin/kotlinx.coroutines/issues/172
z

Zach Klippenstein (he/him) [MOD]

10/10/2018, 3:22 PM
With structured concurrency, you don’t need the joins at all. The outer coroutine will wait for all its children to finish, so you just need to launch your worker coroutines from the enclosing scope instead of
GlobalScope
.
runBlocking {
  myItems.forEach {
    launch(Dispatchers.Default) {
      // Do something.
      println("Processing $it…")
      Thread.sleep(it * 1000L)
      println("$it finished.")
    }
  }
}

println("All done.")
Prints:
Processing 1…
Processing 2…
Processing 3…
1 finished.
2 finished.
3 finished.
All done.
(For actually blocking work though, you should use
<http://Dispatchers.IO|Dispatchers.IO>
not
Dispatchers.Default
).
g

gildor

10/10/2018, 3:42 PM
CoroutineScope can help, it’s true, but you cannot get result of invocation and I still not sure that CoroutineScope solution for multiple coroutines is better than more explicit
joinAll()
z

Zach Klippenstein (he/him) [MOD]

10/10/2018, 4:15 PM
I can see an argument for the implicit join not being obvious (i had the same thought initially), but I think that might just be because we’re all still very new to reading structured concurrency-based code – the instinct is “it’s spawning a new thread it’s going to leak”, instead of “it’s spawning a new thread, the parent is going to wait”, and that habit will just take time to change.
Kotlin maintainers keep stressing that
GlobalScope
should be used very rarely, and so doing the “right thing” here and using the parent scope to launch children makes the `join`s redundant. If the example was using
async
then yes you’d need some processing at the end, but since it’s using
launch
it looks like the intention of this code is just to not leave the
runBlocking
coroutine until all the child work is done, which is why structured concurrency was introduced in the first place. I would expect this sort of pattern will get more common and start to look less suspicious over time