TW
11/18/2022, 12:35 PMTW
11/18/2022, 12:35 PMclass DownloadsScheduler(
  private val downloadsQueue: DownloadsQueue,
  private val downloader: Downloader,
) {
  private var _scope: CoroutineScope? = null
  private val scope: CoroutineScope
    get() = _scope ?: error("not initialized")
  private val jobs = ConcurrentHashMap<String, Job>()
  fun startIn(scope: CoroutineScope) {
    check(_scope == null) { "Already started" }
    downloadsQueue
      .selectTop(top = 3)
      .onEach { downloads ->
        for (download in downloads) {
          // I want to download parallel
          val job = scope.launch {
            val result = downloader.download(download.id)
            result.fold(
              success = { /* handle */ },
              failure = { /* handle */ },
            )
          }
          jobs[download.id] = job
        }
      }
  }
  fun enqueue(id: String) {
    scope.launch {
      withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        downloadsQueue.add(id)
      }
    }
  }
  fun cancel(id: String) {
    scope.launch {
      withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        jobs[id]?.cancelAndJoin()
        downloadsQueue.remove(id)
      }
    }
  }
}
interface Downloader {
  suspend fun download(id: String): Result<Unit, Throwable>
}
interface DownloadsQueue {
  fun selectTop(top: Int): Flow<List<Download>>
  suspend fun add(id: String)
  suspend fun remove(id: String)
}
data class Download(
  val id: String,
  val meta: Any,
)gildor
11/30/2022, 2:23 AMid