From 9b9e29a3b6f6b71b5ed546d87fafa53911f5764c Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 30 Aug 2020 13:29:42 -0400 Subject: [PATCH 1/3] Implement worker thread as a daemon --- cps/services/worker.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index 14c94df6..92c5fa3d 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -59,7 +59,7 @@ class WorkerThread(threading.Thread): threading.Thread.__init__(self) self.dequeued = list() - + self.daemon = True self.doLock = threading.Lock() self.queue = ImprovedQueue() self.num = 0 @@ -101,24 +101,23 @@ class WorkerThread(threading.Thread): # Main thread loop starting the different tasks def run(self): - main_thread = _get_main_thread() - while main_thread.is_alive(): - # this blocks until something is available - item = self.queue.get() - with self.doLock: - # add to list so that in-progress tasks show up - self.dequeued.append(item) + # this blocks until something is available + item = self.queue.get() - # once we hit our trigger, start cleaning up dead tasks - if len(self.dequeued) > TASK_CLEANUP_TRIGGER: - self.cleanup_tasks() + with self.doLock: + # add to list so that in-progress tasks show up + self.dequeued.append(item) - # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if item.task.stat is STAT_WAITING: - # CalibreTask.start() should wrap all exceptions in it's own error handling - item.task.start(self) + # once we hit our trigger, start cleaning up dead tasks + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + self.cleanup_tasks() - self.queue.task_done() + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.task.stat is STAT_WAITING: + # CalibreTask.start() should wrap all exceptions in it's own error handling + item.task.start(self) + + self.queue.task_done() class CalibreTask: From b0a055a8709558e3a92f59bc96560fa4a6a1b8bb Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 30 Aug 2020 13:31:59 -0400 Subject: [PATCH 2/3] Rename email.py -> mail.py to avoid shadowing standard email module --- cps/helper.py | 2 +- cps/tasks/convert.py | 2 +- cps/tasks/{email.py => mail.py} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename cps/tasks/{email.py => mail.py} (100%) diff --git a/cps/helper.py b/cps/helper.py index 624a3b54..a37fbb15 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -64,7 +64,7 @@ from . import gdriveutils as gd from .constants import STATIC_DIR as _STATIC_DIR from .subproc_wrapper import process_wait from .services.worker import WorkerThread, STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS -from .tasks.email import TaskEmail +from .tasks.mail import TaskEmail log = logger.create() diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 5060c29e..09263964 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -14,7 +14,7 @@ from cps import logger, config from cps.subproc_wrapper import process_open from flask_babel import gettext as _ -from cps.tasks.email import TaskEmail +from cps.tasks.mail import TaskEmail from cps import gdriveutils log = logger.create() diff --git a/cps/tasks/email.py b/cps/tasks/mail.py similarity index 100% rename from cps/tasks/email.py rename to cps/tasks/mail.py From ef49e2b5b30ff5c0bc3439e1fbd7447cb279128e Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 30 Aug 2020 13:49:45 -0400 Subject: [PATCH 3/3] Revert daemon in favor of timeout --- cps/services/worker.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index 92c5fa3d..e434528f 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -3,6 +3,7 @@ from __future__ import division, print_function, unicode_literals import threading import abc import uuid +import time try: import queue @@ -59,7 +60,7 @@ class WorkerThread(threading.Thread): threading.Thread.__init__(self) self.dequeued = list() - self.daemon = True + self.doLock = threading.Lock() self.queue = ImprovedQueue() self.num = 0 @@ -101,23 +102,33 @@ class WorkerThread(threading.Thread): # Main thread loop starting the different tasks def run(self): - # this blocks until something is available - item = self.queue.get() + main_thread = _get_main_thread() + while main_thread.is_alive(): + try: + # this blocks until something is available. This can cause issues when the main thread dies - this + # thread will remain alive. We implement a timeout to unblock every second which allows us to check if + # the main thread is still alive. + # We don't use a daemon here because we don't want the tasks to just be abruptly halted, leading to + # possible file / database corruption + item = self.queue.get(timeout=1) + except queue.Empty as ex: + time.sleep(1) + continue - with self.doLock: - # add to list so that in-progress tasks show up - self.dequeued.append(item) + with self.doLock: + # add to list so that in-progress tasks show up + self.dequeued.append(item) - # once we hit our trigger, start cleaning up dead tasks - if len(self.dequeued) > TASK_CLEANUP_TRIGGER: - self.cleanup_tasks() + # once we hit our trigger, start cleaning up dead tasks + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + self.cleanup_tasks() - # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if item.task.stat is STAT_WAITING: - # CalibreTask.start() should wrap all exceptions in it's own error handling - item.task.start(self) + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.task.stat is STAT_WAITING: + # CalibreTask.start() should wrap all exceptions in it's own error handling + item.task.start(self) - self.queue.task_done() + self.queue.task_done() class CalibreTask: