Hello, i need advice. There is some file downloade...
# coroutines
t
Hello, i need advice. There is some file downloader. I want to manage each download (cancel, enqueue etc.). I made something like sample (see thread). But I doubt it is right to manipulate jobs in this way. Maybe there is some more elegant way in coroutines or sample is ok?
Copy code
class DownloadsScheduler(
  private val downloadsQueue: DownloadsQueue,
  private val downloader: Downloader,
) {

  private var _scope: CoroutineScope? = null

  private val scope: CoroutineScope
    get() = _scope ?: error("not initialized")

  private val jobs = ConcurrentHashMap<String, Job>()

  fun startIn(scope: CoroutineScope) {
    check(_scope == null) { "Already started" }
    downloadsQueue
      .selectTop(top = 3)
      .onEach { downloads ->
        for (download in downloads) {
          // I want to download parallel
          val job = scope.launch {
            val result = downloader.download(download.id)
            result.fold(
              success = { /* handle */ },
              failure = { /* handle */ },
            )
          }
          jobs[download.id] = job
        }
      }
  }

  fun enqueue(id: String) {
    scope.launch {
      withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        downloadsQueue.add(id)
      }
    }
  }

  fun cancel(id: String) {
    scope.launch {
      withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        jobs[id]?.cancelAndJoin()
        downloadsQueue.remove(id)
      }
    }
  }
}

interface Downloader {
  suspend fun download(id: String): Result<Unit, Throwable>
}

interface DownloadsQueue {
  fun selectTop(top: Int): Flow<List<Download>>
  suspend fun add(id: String)
  suspend fun remove(id: String)
}

data class Download(
  val id: String,
  val meta: Any,
)
g
In general this approach looks fine for this particular API recommend, current API is essentially deprecated As I see this task corresponds of 2 parts: 1. Limiting parallel downloads 2. Allowing cancelling particular downloads 1. Can be solved by many ways, manual as in your case, using custom dispatcher, using actors, using Flow flatMap*(concurrency = N), though each will have own semantics. Your approach actually not bad IMO, it’s predictable and flexible and works for particular semantics 2. Also can be different, but Job is a good abstraction for this for coroutines and IO don’t see any issue with you particular approch, because API wiorks with
id
you anyway need some way to find task (abstracted somehow, in your case by job) and cancelling just a side note, if you combine launch + with context to lauch(Dispatcher.IO) it would be even shorter in terms of readability. If we talking about API, I would also say that API of startIn doesn’t look as the best solution for me, I would rather inject it into constructor, maybe make this method suspend etc