I have about 1 million tasks that I need to complete. I would like to run 20 of them concurrently, a...
r
I have about 1 million tasks that I need to complete. I would like to run 20 of them concurrently, and schedule them in batches of 1000. I'm looking at using newFixedThreadPoolContext, but I'm not sure that's correct, and the DelicateCouroutinesApi warning has me feeling unsure about it. What is an appropriate way to achieve this using coroutines?
The tasks are to re-encrypt documents stored in GCS, so there's a lot of IO wait during the tasks, hence the high amount of concurrency.
a
Copy code
val allTasks = mutableListOf<Task>()
val semaphore = Semaphore(20)
for (taskBatch in allTasks.chunked(1000)) {
    val completedTasks = taskBatch.map { task ->
        // run on IO dispatcher
        async(<http://Dispatchers.IO|Dispatchers.IO>) {
            // only allow 20 tasks to be running at a time
            semaphore.withPermit {
                task.run()
            }
        }
    }.awaitAll()
    // do something with completedTasks -- report results, etc.
}
that’s how i’ve done it in the past at least
r
oh yeah that's pretty clean. I guess I don't need to worry about the number of threads since the number of concurrent coroutines != the number of threads. Too many years of java 😅
Thanks for the tip @akatkov!
👍 1
j
You can also do a channel of size 1000, a coroutine feeding the channel from your input source (so it will receive backpressure), and 20 coroutines reading from the channel doing work.
Copy code
val channel = Channel(1000)
launch {
  // feed into channel
}
repeat(20) {
  launch {
    for (message in channel) {
      // do something
    }
  }
}
🧠 1
👍 1
r
That channel method is pretty cool
e
if you have an input
Flow
, you can
Copy code
flow.flatMapMerge(concurrency = 20) { message ->
    flow {
        // do something
    }
}.collect()
although it definitely doesn't feel like an intuitive use of the API to me
another option is to use helpers that automatically manage the channel:
Copy code
produce {
    // feed into this channel
}.consume {
    coroutineScope {
        repeat(20) {
            launch {
                for (message in this@consume) {
                    // do something
                }
            }
        }
    }
}
👌 1