How to properly initialize such kind of patterns? ...
# coroutines
t
How to properly initialize such kind of patterns? Currently depending on the state of the device and load, the repeat call can trigger creation of tons of threads even if there's no tasks yet. Generating a lot of unecessary load at init time.
Copy code
class XXX  : CoroutineScope {

	override val coroutineContext = <http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()

	internal val tasks = Channel<Task>(UNLIMITED)

    fun startWorkers() {
        repeat(150) {
            launch {
                for (task in tasks) executeTask(task)
            }
        }
    }
}
z
What if you use the
Default
dispatcher (smaller, fixed-size thread pool, so you’re not gonna create a bunch of threads), then inside each of the workers you wait to switch to the
IO
dispatcher until you have the first task?
Copy code
launch(Unconfined) {
  var initialTask = tasks.receive()
  withContext(IO) {
    executeTask(initialTask)
    for (task in tasks) executeTask(task)
  }
}
I think you could even use
Unconfined
to launch them, since the only work they’d be doing initially would be registering to receive from the channel.
t
The problem is that your solution loose the parallelism configuration. At some point I need to start at max X for loops. 150 in the posted sample for example.
Maybe @elizarov have an official pattern for that. I suppose I could wrap the repeat in a coroutine and add a small delay() after each launch as a workaround.
z
You only lose parallelism while waiting for the first emission from the channel, but that doesn’t matter – the coroutines aren’t doing any work then anyway, they’re just waiting. As soon as you switch to the IO dispatcher you get the full parallelism of the IO dispatcher back.
t
I’m talking about the parallelism in active running tasks. the repeat ensure the number of simultaneous tasks. Without 1 launch + for loop, you only have 1 running task. And if I do the launch in the for loop then I loose control over max running tasks too.
z
oh, sorry i meant you’d still have the repeat. i was just referring to the code inside it. So,
Copy code
fun startWorkers() {
  repeat(150) {
    startWorker()
  }
}

private fun startWorker() {
  launch(Unconfined) {
    var initialTask = tasks.receive()
    withContext(IO) {
      executeTask(initialTask)
      for (task in tasks) executeTask(task)
    }
  }
}
t
Ho yes I'm stupid 🙂 Thanks this should work nicely.
For the record tasks.receive() throw on channel closing unlike the for loop. So call require try catch ClosedReceiveChannelException. And it seems there's maybe a side effect of using Unconfined leading to withContext not always switching. Will investigate more on that last one.
z
Oh yikes, that seems bad. Is there an issue?
Anyway, you can use
Default
too for that initial dispatcher.
t
That issue was due to me being too tired and inheriting the wrong coroutineContext,.With Unconfined there's strange difference in the number of launch but I did not investigate that much and use my global background dispatcher, that fit the needs and solve my issue.
Copy code
repeat(maxConcurrentTasks) {
            launch(Dispatchers.Unconfined) {
                Log.e("AAAA", "Launch ${Thread.currentThread().name}" )
                val initialTask = try {
                    tasks.receive()
                } catch (e: Exception) {
                    return@launch
                }
                withContext(this@OkHttpWorkerPool.coroutineContext) {
                    Log.e("AAAA", "Run ${Thread.currentThread().name}" )
                    executeTask(initialTask)
                    for (task in tasks) executeTask(task)
                }
            }
        }
Will only log 4 Launch main thread
Using Dispatchers.Default or IO will properly log 1 Launch per repeat.