https://kotlinlang.org logo
Title
e

Esa

01/16/2020, 11:47 AM
Hi, need a bit of help with where to start on refactoring my code. The code performs a list of tasks. The list itself is quite huge, so I’ve `.chunk()`’ed the list into smaller chunks. Each of these chunks I then start each task asynchronously, and await them all before continuing to the next chunk. I concider this a cheap hack by someone who doesn’t actually know how to implement parallellism fully.. If I instead want to convert this into a pool, that
n
amount of workers can pick tasks from, where do I start? Java has ThreadPool, what do we have when we use coroutines instead? The reason I want to convert it to a pool with fixed size of tasks to be run concurrently is because in my current
.awaitAll()
-implementation I rarely have all workers working simultaneously, but very often only half or less working because the other half is already completed, leading to a very inefficient implementation. The tasks themselves involve api-calls to the same destination, so some kind of throttling is very important. What I am looking for is someone to tell me what these concepts are called so I can do the googling more efficiently 🙂 Thanks in advance.
d

diesieben07

01/16/2020, 12:01 PM
You should check out coroutines and channels. Basic idea is as follows:
coroutineScope {
  val channel = produce {
    send(element) // produce elements here as fast as you can
  }
  repeat(10) { // start 10 workers
    launch {
      channel.consumeEach { element ->
        // do something with the element
      }
    
  }
}
e

Esa

01/16/2020, 12:16 PM
Channels, thank you. This
consumeEach()
-function, is it threadsafe? Can two channels consume the same element? One of the problems I had when I `.launch()`’ed every task from my unchunked list, was every single call being sent at the same time. Will that be prevented here?
d

diesieben07

01/16/2020, 12:18 PM
Yes, receiving from a channel is "threadsafe" (you should not think in terms of threads ;)). Only one of the 10 workers will receive each element.
e

Esa

01/16/2020, 12:35 PM
This is interesting. Thank you. Just to be specific: this
repeat(10)
-section you wrote, to me it looks like it will produce 10 workers that all start from the top of the list (as they all call the same function), resulting in 10 workers accessing index 0. But this is not the case?
d

diesieben07

01/16/2020, 12:41 PM
channel
is not like a list. You
send
something to it which will either buffer (if you have a channel with a buffer, check the
capacity
parameter of
produce
) or suspend until a worker is available to take it. On the worker side you
receive
things (that's what
consumeEach
or a
for in
loop does, too). This
receive
suspends your worker until either an element is available (the sender and receiver meet up) or a buffered element is available.
I highly recommend you read the entire documentation I linked above if you are unfamiliar with channels.
👍 2
e

Esa

01/16/2020, 12:42 PM
Allright, thank you. I will read through it. 🙂
Wow, this was a really cool piece of the language. Seems like it does exactly what I want.
Any suggestions what to use instead of
consumeEach {}
when I want the consumers to return a value?
t

tseisel

01/17/2020, 8:51 AM
You can share a
Channel
for dispatching to multiple workers (fan-out), then use another channel to send the result back to the producer (fan-in) :
val input = Channel<Foo>()
val output = Channel<Bar>()

repeat(10) {
    launch {
        input.consumeEach { foo ->
            val bar = transform(foo)
            output.send(bar)
        }
    }
}

output.consumeEach { bar ->
    doSomethingWithResult(bar)
}
e

Esa

01/17/2020, 12:24 PM
Clever. Didn’t think of that, I’ll give it a shot. Thanks.