Esa
01/16/2020, 11:47 AMn
amount of workers can pick tasks from, where do I start? Java has ThreadPool, what do we have when we use coroutines instead?
The reason I want to convert it to a pool with fixed size of tasks to be run concurrently is because in my current .awaitAll()
-implementation I rarely have all workers working simultaneously, but very often only half or less working because the other half is already completed, leading to a very inefficient implementation.
The tasks themselves involve api-calls to the same destination, so some kind of throttling is very important.
What I am looking for is someone to tell me what these concepts are called so I can do the googling more efficiently 🙂
Thanks in advance.diesieben07
01/16/2020, 12:01 PMcoroutineScope {
val channel = produce {
send(element) // produce elements here as fast as you can
}
repeat(10) { // start 10 workers
launch {
channel.consumeEach { element ->
// do something with the element
}
}
}
Esa
01/16/2020, 12:16 PMconsumeEach()
-function, is it threadsafe? Can two channels consume the same element?
One of the problems I had when I `.launch()`’ed every task from my unchunked list, was every single call being sent at the same time. Will that be prevented here?diesieben07
01/16/2020, 12:18 PMEsa
01/16/2020, 12:35 PMrepeat(10)
-section you wrote, to me it looks like it will produce 10 workers that all start from the top of the list (as they all call the same function), resulting in 10 workers accessing index 0. But this is not the case?diesieben07
01/16/2020, 12:41 PMchannel
is not like a list. You send
something to it which will either buffer (if you have a channel with a buffer, check the capacity
parameter of produce
) or suspend until a worker is available to take it.
On the worker side you receive
things (that's what consumeEach
or a for in
loop does, too). This receive
suspends your worker until either an element is available (the sender and receiver meet up) or a buffered element is available.Esa
01/16/2020, 12:42 PMconsumeEach {}
when I want the consumers to return a value?tseisel
01/17/2020, 8:51 AMChannel
for dispatching to multiple workers (fan-out), then use another channel to send the result back to the producer (fan-in) :
val input = Channel<Foo>()
val output = Channel<Bar>()
repeat(10) {
launch {
input.consumeEach { foo ->
val bar = transform(foo)
output.send(bar)
}
}
}
output.consumeEach { bar ->
doSomethingWithResult(bar)
}
Esa
01/17/2020, 12:24 PM