From e7eb5b6ea653f2d48d38569d57ae41715c5c04ef Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sat, 29 Aug 2020 12:24:12 -0400 Subject: [PATCH] Improvements to task processing * Moves clean up to separate function for organization * moves the `self.dequeued.append(item)` immediately after `self.queue.get()` so that we don't wait for it to show up (cherry picked from commit bc9372e88f0c8855694431f811702d7fb899a97e) --- cps/services/worker.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index 05307b52..14c94df6 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -82,33 +82,37 @@ class WorkerThread(threading.Thread): tasks = self.queue.to_list() + self.dequeued return sorted(tasks, key=lambda x: x.num) + def cleanup_tasks(self): + with self.doLock: + dead = [] + alive = [] + for x in self.dequeued: + (dead if x.task.dead else alive).append(x) + + # if the ones that we need to keep are within the trigger, do nothing else + delta = len(self.dequeued) - len(dead) + if delta > TASK_CLEANUP_TRIGGER: + ret = alive + else: + # otherwise, lop off the oldest dead tasks until we hit the target trigger + ret = sorted(dead, key=lambda x: x.task.end_time)[-TASK_CLEANUP_TRIGGER:] + alive + + self.dequeued = sorted(ret, key=lambda x: x.num) # Main thread loop starting the different tasks def run(self): main_thread = _get_main_thread() while main_thread.is_alive(): + # this blocks until something is available item = self.queue.get() - with self.doLock: - # once we hit our trigger, start cleaning up dead tasks - if len(self.dequeued) > TASK_CLEANUP_TRIGGER: - dead = [] - alive = [] - for x in self.dequeued: - (dead if x.task.dead else alive).append(x) - - # if the ones that we need to keep are within the trigger, do nothing else - delta = len(self.dequeued) - len(dead) - if delta > TASK_CLEANUP_TRIGGER: - ret = alive - else: - # otherwise, lop off the oldest dead tasks until we hit the target trigger - ret = sorted(dead, key=lambda x: x.task.end_time)[-TASK_CLEANUP_TRIGGER:] + alive - - self.dequeued = sorted(ret, key=lambda x: x.num) # add to list so that in-progress tasks show up self.dequeued.append(item) + # once we hit our trigger, start cleaning up dead tasks + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + self.cleanup_tasks() + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished if item.task.stat is STAT_WAITING: # CalibreTask.start() should wrap all exceptions in it's own error handling