Hi! For my use case we need to run async tasks on ...
# coroutines
v
Hi! For my use case we need to run async tasks on a thread pool with insured progress, meaning that a task should not take much longer to complete even with many concurrent task. That could be implemented with a custom dispatcher and a priority queue via assigning the first task the highest priority and assigning lower priorities for following tasks. Is there a simpler way of doing it? If not, is it in plans to have such a dispatcher in kotlinx?
@elizarov^
This is a pretty common use case, btw - 2nd time I need it in one project
e
I’m not exactly sure I understand your use-case. Why a simple
Executors.newFixedThreadPool(n).asCoroutineDispatcher()
does not work for you?
v
Let me explain with an example: suppose we have N tasks with three stages each:
[[a1, b1, c1], [a2, b2, c2], [a3, b3, c3], ...]
, each stage is a roughly equivalent expensive CPU computation.
With
ScheduledThreadPoolExecutor
the system will execute task stages in order of their submission:
a1, a2, a3, a4, ..., b1, b2, b3, ..., c1, c2, ...
. Note that when (N >> 1) tasks come simultaneously, all of them will progress through stages "in-parallel" and finish at about the same time. In other worlds, the time it takes to complete 1st task (
a1, b1, c1
) is O(N) in this scenario
What I need is some guaranty of the progress for each task, e.i. that
a1, b1, c1
takes about the same for any N. That can happen if our scheduler has a higher priority for earlier tasks, so it never executes
a3
if it can execute
c1
instead. Makes sense?
e
Yes. Now it makes sense.
(Though people rarely schedule concurrent tasks this way)
v
BTW is there any easy way of achieving that?
e
Yes. You can write a custom coroutine dispatcher. The idea is that you assign a unqiue auto-incremeneted ID to each coroutine that you start, store this ID in its context, then use ProrityQueue in your executor/dispatcher where this ID is used as a priority in your queue.