TW
11/18/2022, 12:35 PMTW
11/18/2022, 12:35 PMclass DownloadsScheduler(
private val downloadsQueue: DownloadsQueue,
private val downloader: Downloader,
) {
private var _scope: CoroutineScope? = null
private val scope: CoroutineScope
get() = _scope ?: error("not initialized")
private val jobs = ConcurrentHashMap<String, Job>()
fun startIn(scope: CoroutineScope) {
check(_scope == null) { "Already started" }
downloadsQueue
.selectTop(top = 3)
.onEach { downloads ->
for (download in downloads) {
// I want to download parallel
val job = scope.launch {
val result = downloader.download(download.id)
result.fold(
success = { /* handle */ },
failure = { /* handle */ },
)
}
jobs[download.id] = job
}
}
}
fun enqueue(id: String) {
scope.launch {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
downloadsQueue.add(id)
}
}
}
fun cancel(id: String) {
scope.launch {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
jobs[id]?.cancelAndJoin()
downloadsQueue.remove(id)
}
}
}
}
interface Downloader {
suspend fun download(id: String): Result<Unit, Throwable>
}
interface DownloadsQueue {
fun selectTop(top: Int): Flow<List<Download>>
suspend fun add(id: String)
suspend fun remove(id: String)
}
data class Download(
val id: String,
val meta: Any,
)
gildor
11/30/2022, 2:23 AMid
you anyway need some way to find task (abstracted somehow, in your case by job) and cancelling
just a side note, if you combine launch + with context to lauch(Dispatcher.IO) it would be even shorter in terms of readability.
If we talking about API, I would also say that API of startIn doesn’t look as the best solution for me, I would rather inject it into constructor, maybe make this method suspend etc