https://kotlinlang.org logo
Title
k

Kush Patel

10/07/2020, 12:40 AM
👋, I am trying to convert a threaded worker loop to coroutine worker loop.
val workers = ArrayList<Future<*>>(10)
try {
    for (i in 1..10) {
        val task = getTask() ?: break
        workers += taskExecutor.submit {
            workerLoop(task) // some computation
        }
    }
} catch (t: Throwable) {
    // log error
}

workers.forEach {
    try {
        it.get()
    } catch (t: Throwable) {
        // log error
    }
}
Proposed coroutine worker
val workers = ArrayList<Deferred<*>>(10)
try {
    for (i in 1..10) {
        val task = getTask() ?: break
        workers += taskExecutor.async {
            workerLoop(task) // some computation
        }
    }
} catch (t: Throwable) {
    // Log error
}

runBlocking {
    workers.forEach {
        try {
            it.await()
        } catch (t: Throwable) {
            // Log error
        }
    }
}
Is this the right way to switch from threads to coroutines?
g

gildor

10/07/2020, 1:27 AM
if you don’t return any value from Deferred, just use launch instead of async
in general it really depend how you want to use this code, now it essentially the same function, if you move loop to own function it would really depend what is taskExecutor
in general it looks that you example doesn’t follow structured concurrency (because you just run background task in some external taskExecutor), but it will work anyway
also it depends what kind failure strategy you want. Instead of doing
try { it.await() }
I would return some result from Deferred, so try/catch workerLoop(task) instead
but I’m still not sure about
taskExecutor
part
k

Kush Patel

10/07/2020, 2:59 AM
Sorry about that,
taskExexutor
is a cached thread pool. So
Executors.newCachedThreadPool {}
When using the coroutine version I would use
taskExecutor
as a coroutine dispatcher in a separate coroutine scope
g

gildor

10/07/2020, 4:54 AM
so you just run it on external thread pool? Kinda valid, but still not sure about API which you want to have
it looks that you need something like this:
suspend fun doSomethingInParallel(
    dispatcher: CoroutineDispatcher = taskExecutor
): List<Result<Something>> = withContext(dispatcher) {
    (1..10).map { i ->
        async {
            // or any other way to encode failed/successful tasks
            runCatching {
                workerLoop(getTask())
            }
        }
    }.awaitAll()
}
taskExecutor just becomes default dispatcher (so it’s easier to test), also it just a suspend function, you can adjust error handling as you want, for example quite often you don’t want to execute all tasks, if one of them failed, and this is default kotlinx.coroutines strategy