From f10f0dada6fb43f693da637ff0f9210af96f923d Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sat, 22 Aug 2020 16:31:00 -0400 Subject: [PATCH 01/18] First working PoC with a new task structure --- cps/helper.py | 62 +++++------- cps/services/worker.py | 209 +++++++++++++++++++++++++++++++++++++++++ cps/tasks/__init__.py | 0 cps/tasks/convert.py | 196 ++++++++++++++++++++++++++++++++++++++ cps/web.py | 12 +-- 5 files changed, 433 insertions(+), 46 deletions(-) create mode 100644 cps/services/worker.py create mode 100644 cps/tasks/__init__.py create mode 100644 cps/tasks/convert.py diff --git a/cps/helper.py b/cps/helper.py index 681719a9..1634ec63 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -40,6 +40,7 @@ from sqlalchemy.sql.expression import true, false, and_, or_, text, func from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash from . import calibre_db +from .tasks.convert import TaskConvert try: from urllib.parse import quote @@ -66,6 +67,8 @@ from .pagination import Pagination from .subproc_wrapper import process_wait from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY +from .services.worker import WorkerThread +from . import tasks log = logger.create() @@ -103,7 +106,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, text = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title)) settings['old_book_format'] = old_book_format settings['new_book_format'] = new_book_format - worker.add_convert(file_path, book.id, user_id, text, settings, kindle_mail) + WorkerThread.add(user_id, TaskConvert(file_path, book.id, text, settings, kindle_mail)) return None else: error_message = _(u"%(format)s not found: %(fn)s", @@ -703,47 +706,30 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for task in tasklist: - if task['user'] == current_user.nickname or current_user.role_admin(): - if task['formStarttime']: - task['starttime'] = format_datetime(task['formStarttime'], format='short', locale=get_locale()) - # task2['formStarttime'] = "" - else: - if 'starttime' not in task: - task['starttime'] = "" - - if 'formRuntime' not in task: - task['runtime'] = "" - else: - task['runtime'] = format_runtime(task['formRuntime']) + for user, task in tasklist: + if user == current_user.nickname or current_user.role_admin(): + ret = {} + if task.start_time: + ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale()) + ret['runtime'] = format_runtime(task.runtime) # localize the task status - if isinstance( task['stat'], int): - if task['stat'] == STAT_WAITING: - task['status'] = _(u'Waiting') - elif task['stat'] == STAT_FAIL: - task['status'] = _(u'Failed') - elif task['stat'] == STAT_STARTED: - task['status'] = _(u'Started') - elif task['stat'] == STAT_FINISH_SUCCESS: - task['status'] = _(u'Finished') + if isinstance(task.stat, int): + if task.stat == STAT_WAITING: + ret['status'] = _(u'Waiting') + elif task.stat == STAT_FAIL: + ret['status'] = _(u'Failed') + elif task.stat == STAT_STARTED: + ret['status'] = _(u'Started') + elif task.stat == STAT_FINISH_SUCCESS: + ret['status'] = _(u'Finished') else: - task['status'] = _(u'Unknown Status') + ret['status'] = _(u'Unknown Status') - # localize the task type - if isinstance( task['taskType'], int): - if task['taskType'] == TASK_EMAIL: - task['taskMessage'] = _(u'E-mail: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] - elif task['taskType'] == TASK_UPLOAD: - task['taskMessage'] = _(u'Upload: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT_ANY: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] - else: - task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess'] - - renderedtasklist.append(task) + ret['taskMessage'] = "{}: {}".format(_(task.name), task.message) + ret['progress'] = "{} %".format(int(task.progress * 100)) + ret['user'] = user + renderedtasklist.append(ret) return renderedtasklist diff --git a/cps/services/worker.py b/cps/services/worker.py new file mode 100644 index 00000000..61f199f9 --- /dev/null +++ b/cps/services/worker.py @@ -0,0 +1,209 @@ + +from __future__ import division, print_function, unicode_literals +import sys +import os +import re +import smtplib +import socket +import time +import threading +try: + import queue +except ImportError: + import Queue as queue +from glob import glob +from shutil import copyfile +from datetime import datetime + +try: + from StringIO import StringIO + from email.MIMEBase import MIMEBase + from email.MIMEMultipart import MIMEMultipart + from email.MIMEText import MIMEText +except ImportError: + from io import StringIO + from email.mime.base import MIMEBase + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + +from email import encoders +from email.utils import formatdate +from email.utils import make_msgid +from email.generator import Generator +from flask_babel import gettext as _ + +from cps import calibre_db, db +from cps import logger, config +from cps.subproc_wrapper import process_open +from cps import gdriveutils +from flask_babel import gettext as _ +import abc + +log = logger.create() + +# task 'status' consts +STAT_WAITING = 0 +STAT_FAIL = 1 +STAT_STARTED = 2 +STAT_FINISH_SUCCESS = 3 + + +def _get_main_thread(): + for t in threading.enumerate(): + if t.__class__.__name__ == '_MainThread': + return t + raise Exception("main thread not found?!") + + + +class ImprovedQueue(queue.Queue): + def to_list(self): + """ + Returns a copy of all items in the queue without removing them. + """ + + with self.mutex: + return list(self.queue) + +#Class for all worker tasks in the background +class WorkerThread(threading.Thread): + __instance = None + + @classmethod + def getInstance(cls): + if cls._instance is None: + cls._instance = WorkerThread() + return cls._instance + + def __init__(self): + threading.Thread.__init__(self) + + self.finished = list() + + self.db_queue = queue.Queue() + calibre_db.add_queue(self.db_queue) + + self.doLock = threading.Lock() + self.queue = ImprovedQueue() + + # todo: figure this stuff out and where it should goes + self.asyncSMTP = None + + self.start() + + @classmethod + def add(cls, user, task): + ins = cls.getInstance() + ins.queue.put((user, task)) + + @property + def tasks(self): + with self.doLock: + tasks = list(self.queue.to_list()) + self.finished + return tasks # todo: order by data added + + # Main thread loop starting the different tasks + def run(self): + main_thread = _get_main_thread() + while main_thread.is_alive(): + user, item = self.queue.get() + + # add to list so that in-progress tasks show up + with self.doLock: + self.finished.append((user, item)) + + try: + item.start(self) + print(item) + except Exception as e: + log.exception(e) + + self.queue.task_done() + + def get_send_status(self): + raise NotImplementedError + # if self.asyncSMTP: + # return self.asyncSMTP.getTransferStatus() + # else: + # return "0 %" + + def _delete_completed_tasks(self): + raise NotImplementedError() + # for index, task in reversed(list(enumerate(self.UIqueue))): + # if task['progress'] == "100 %": + # # delete tasks + # self.queue.pop(index) + # self.UIqueue.pop(index) + # # if we are deleting entries before the current index, adjust the index + # if index <= self.current and self.current: + # self.current -= 1 + # self.last = len(self.queue) + +class CalibreTask(metaclass=abc.ABCMeta): + + def __init__(self, message): + self._progress = 0 + self.stat = STAT_WAITING + self.error = None + self.start_time = None + self.end_time = None + self.message = message + + @abc.abstractmethod + def run(self, worker_thread): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + @abc.abstractmethod + def name(self): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + def start(self, *args): + self.start_time = datetime.now() + self.run(*args) + self.end_time = datetime.now() + + @property + def stat(self): + return self._stat + + @stat.setter + def stat(self, x): + self._stat = x + + @property + def progress(self): + return self._progress + + @progress.setter + def progress(self, x): + # todo: throw error if outside of [0,1] + self._progress = x + + @property + def error(self): + return self._error + + @error.setter + def error(self, x): + self._error = x + + @property + def runtime(self): + return (self.end_time or datetime.now()) - self.start_time + + @progress.setter + def progress(self, x): + # todo: throw error if outside of [0,1] + self._progress = x + + def _handleError(self, error_message): + log.error(error_message) + self.stat = STAT_FAIL + self.progress = 1 + self.error = error_message + + def _handleSuccess(self): + self.stat = STAT_FINISH_SUCCESS + self.progress = 1 diff --git a/cps/tasks/__init__.py b/cps/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py new file mode 100644 index 00000000..58b45d33 --- /dev/null +++ b/cps/tasks/convert.py @@ -0,0 +1,196 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import re + +from glob import glob +from shutil import copyfile + +from cps.services.worker import CalibreTask +from cps import calibre_db, db +from cps import logger, config +from cps.subproc_wrapper import process_open +from flask_babel import gettext as _ + +log = logger.create() + + +class TaskConvert(CalibreTask): + def __init__(self, file_path, bookid, taskMessage, settings, kindle_mail): + super().__init__(taskMessage) + self.file_path = file_path + self.bookid = bookid + self.settings = settings + self.kindle_mail = kindle_mail + + self.results = dict() + + def run(self, worker_thread): + self.worker_thread = worker_thread + filename = self._convert_ebook_format() + # todo: re-enable this (need to set up gdrive to test with) + # if filename: + # if config.config_use_google_drive: + # gdriveutils.updateGdriveCalibreFromLocal() + # if curr_task == TASK_CONVERT: + # self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], + # filename, self.queue[index]['settings'], self.queue[index]['kindle'], + # self.UIqueue[index]['user'], self.queue[index]['title'], + # self.queue[index]['settings']['body'], internal=True) + + self._handleSuccess() + pass + + def _convert_ebook_format(self): + error_message = None + file_path = self.file_path + book_id = self.bookid + format_old_ext = u'.' + self.settings['old_book_format'].lower() + format_new_ext = u'.' + self.settings['new_book_format'].lower() + + # check to see if destination format already exists - + # if it does - mark the conversion task as complete and return a success + # this will allow send to kindle workflow to continue to work + if os.path.isfile(file_path + format_new_ext): + log.info("Book id %d already converted to %s", book_id, format_new_ext) + cur_book = calibre_db.get_book(book_id) + self.results['path'] = file_path + self.results['title'] = cur_book.title + self._handleSuccess() + return file_path + format_new_ext + else: + log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", + book_id, + format_new_ext) + + if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': + check, error_message = self._convert_kepubify(file_path, + format_old_ext, + format_new_ext) + else: + # check if calibre converter-executable is existing + if not os.path.exists(config.config_converterpath): + # ToDo Text is not translated + self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) + return + check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext) + + if check == 0: + cur_book = calibre_db.get_book(book_id) + if os.path.isfile(file_path + format_new_ext): + # self.db_queue.join() + new_format = db.Data(name=cur_book.data[0].name, + book_format=self.settings['new_book_format'].upper(), + book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) + + # todo: this may not be needed anymore, might be able to access the DB directly now. See #1565 + task = {'task':'add_format','id': book_id, 'format': new_format} + self.worker_thread.db_queue.put(task) + # To Do how to handle error? + + '''cur_book.data.append(new_format) + try: + # db.session.merge(cur_book) + calibre_db.session.commit() + except OperationalError as e: + calibre_db.session.rollback() + log.error("Database error: %s", e) + self._handleError(_(u"Database error: %(error)s.", error=e)) + return''' + + self.results['path'] = cur_book.path + self.results['title'] = cur_book.title + if config.config_use_google_drive: + os.remove(file_path + format_old_ext) + self._handleSuccess() + return file_path + format_new_ext + else: + error_message = format_new_ext.upper() + ' format not found on disk' + log.info("ebook converter failed with error while converting book") + if not error_message: + error_message = 'Ebook converter failed with unknown error' + self._handleError(error_message) + return + + def _convert_kepubify(self, file_path, format_old_ext, format_new_ext): + quotes = [1, 3] + command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] + try: + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Kepubify-converter failed: %(error)s", error=e) + self.progress = 0.01 + while True: + nextline = p.stdout.readlines() + nextline = [x.strip('\n') for x in nextline if x != '\n'] + if sys.version_info < (3, 0): + nextline = [x.decode('utf-8') for x in nextline] + for line in nextline: + log.debug(line) + if p.poll() is not None: + break + + # ToD Handle + # process returncode + check = p.returncode + + # move file + if check == 0: + converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) + if len(converted_file) == 1: + copyfile(converted_file[0], (file_path + format_new_ext)) + os.unlink(converted_file[0]) + else: + return 1, _(u"Converted file not found or more than one file in folder %(folder)s", + folder=os.path.dirname(file_path)) + return check, None + + def _convert_calibre(self, file_path, format_old_ext, format_new_ext): + try: + # Linux py2.7 encode as list without quotes no empty element for parameters + # linux py3.x no encode and as list without quotes no empty element for parameters + # windows py2.7 encode as string with quotes empty element for parameters is okay + # windows py 3.x no encode and as string with quotes empty element for parameters is okay + # separate handling for windows and linux + quotes = [1, 2] + command = [config.config_converterpath, (file_path + format_old_ext), + (file_path + format_new_ext)] + quotes_index = 3 + if config.config_calibre: + parameters = config.config_calibre.split(" ") + for param in parameters: + command.append(param) + quotes.append(quotes_index) + quotes_index += 1 + + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Ebook-converter failed: %(error)s", error=e) + + while p.poll() is None: + nextline = p.stdout.readline() + if os.name == 'nt' and sys.version_info < (3, 0): + nextline = nextline.decode('windows-1252') + elif os.name == 'posix' and sys.version_info < (3, 0): + nextline = nextline.decode('utf-8') + log.debug(nextline.strip('\r\n')) + # parse progress string from calibre-converter + progress = re.search(r"(\d+)%\s.*", nextline) + if progress: + self.progress = int(progress.group(1)) / 100 + + # process returncode + check = p.returncode + calibre_traceback = p.stderr.readlines() + error_message = "" + for ele in calibre_traceback: + if sys.version_info < (3, 0): + ele = ele.decode('utf-8') + log.debug(ele.strip('\n')) + if not ele.startswith('Traceback') and not ele.startswith(' File'): + error_message = "Calibre failed with error: %s" % ele.strip('\n') + return check, error_message + + @property + def name(self): + return "Convert" diff --git a/cps/web.py b/cps/web.py index 7e39054d..6064c1e9 100644 --- a/cps/web.py +++ b/cps/web.py @@ -33,7 +33,7 @@ import re from babel import Locale as LC from babel.dates import format_date from babel.core import UnknownLocaleError -from flask import Blueprint +from flask import Blueprint, jsonify from flask import render_template, request, redirect, send_from_directory, make_response, g, flash, abort, url_for from flask_babel import gettext as _ from flask_login import login_user, logout_user, login_required, current_user, confirm_login @@ -48,7 +48,7 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, cli +from . import constants, logger, isoLanguages, services, worker, worker2, cli from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download @@ -383,12 +383,8 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = worker.get_taskstatus() - answer = render_task_status(tasks) - js = json.dumps(answer, default=json_serial) - response = make_response(js) - response.headers["Content-Type"] = "application/json; charset=utf-8" - return response + tasks = worker2._worker2.tasks + return jsonify(render_task_status(tasks)) @web.route("/ajax/bookmark//", methods=['POST']) From 2533c9c14e4711e72748f26e2851101dca94df65 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sat, 22 Aug 2020 22:44:28 -0400 Subject: [PATCH 02/18] Continue converting tasks - email and upload tasks --- cps/editbooks.py | 10 +- cps/helper.py | 17 ++-- cps/services/worker.py | 14 +-- cps/tasks/email.py | 221 +++++++++++++++++++++++++++++++++++++++++ cps/tasks/upload.py | 18 ++++ cps/web.py | 8 +- 6 files changed, 268 insertions(+), 20 deletions(-) create mode 100644 cps/tasks/email.py create mode 100644 cps/tasks/upload.py diff --git a/cps/editbooks.py b/cps/editbooks.py index 7330572b..903b50e3 100644 --- a/cps/editbooks.py +++ b/cps/editbooks.py @@ -35,6 +35,8 @@ from sqlalchemy.exc import OperationalError from . import constants, logger, isoLanguages, gdriveutils, uploader, helper from . import config, get_locale, ub, worker, db from . import calibre_db +from .services.worker import WorkerThread +from .tasks.upload import TaskUpload from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required @@ -509,8 +511,8 @@ def upload_single_file(request, book, book_id): # Queue uploader info uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) return uploader.process( saved_filename, *os.path.splitext(requested_file.filename), @@ -854,8 +856,8 @@ def upload(): if error: flash(error, category="error") uploadText=_(u"File %(file)s uploaded", file=title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) if len(request.files.getlist("btn-upload")) < 2: if current_user.role_edit() or current_user.role_admin(): diff --git a/cps/helper.py b/cps/helper.py index 1634ec63..cd6133dd 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -68,6 +68,7 @@ from .subproc_wrapper import process_wait from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY from .services.worker import WorkerThread +from .tasks.email import TaskEmail from . import tasks @@ -115,9 +116,9 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, def send_test_mail(kindle_mail, user_name): - worker.add_email(_(u'Calibre-Web test e-mail'), None, None, - config.get_mail_settings(), kindle_mail, user_name, - _(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_name, TaskEmail(_(u'Calibre-Web test e-mail'), None, None, + config.get_mail_settings(), kindle_mail, _(u"Test e-mail"), + _(u'This e-mail has been sent via Calibre-Web.'))) return @@ -132,9 +133,9 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False): text += "Don't forget to change your password after first login.\r\n" text += "Sincerely\r\n\r\n" text += "Your Calibre-Web team" - worker.add_email(_(u'Get Started with Calibre-Web'), None, None, + WorkerThread.add(None, TaskEmail(_(u'Get Started with Calibre-Web'), None, None, config.get_mail_settings(), e_mail, None, - _(u"Registration e-mail for user: %(name)s", name=user_name), text) + _(u"Registration e-mail for user: %(name)s", name=user_name), text)) return @@ -226,9 +227,9 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id): for entry in iter(book.data): if entry.format.upper() == book_format.upper(): converted_file_name = entry.name + '.' + book_format.lower() - worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name, - config.get_mail_settings(), kindle_mail, user_id, - _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_id, TaskEmail(_(u"Send to Kindle"), book.path, converted_file_name, + config.get_mail_settings(), kindle_mail, + _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))) return return _(u"The requested file could not be read. Maybe wrong permissions?") diff --git a/cps/services/worker.py b/cps/services/worker.py index 61f199f9..61a4bae2 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -67,7 +67,7 @@ class ImprovedQueue(queue.Queue): #Class for all worker tasks in the background class WorkerThread(threading.Thread): - __instance = None + _instance = None @classmethod def getInstance(cls): @@ -112,11 +112,12 @@ class WorkerThread(threading.Thread): with self.doLock: self.finished.append((user, item)) - try: - item.start(self) - print(item) - except Exception as e: - log.exception(e) + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.stat is STAT_WAITING: + try: + item.start(self) + except Exception as e: + log.exception(e) self.queue.task_done() @@ -161,6 +162,7 @@ class CalibreTask(metaclass=abc.ABCMeta): def start(self, *args): self.start_time = datetime.now() + self.stat = STAT_STARTED self.run(*args) self.end_time = datetime.now() diff --git a/cps/tasks/email.py b/cps/tasks/email.py new file mode 100644 index 00000000..fbce103e --- /dev/null +++ b/cps/tasks/email.py @@ -0,0 +1,221 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import smtplib +import threading +import socket + +try: + from StringIO import StringIO + from email.MIMEBase import MIMEBase + from email.MIMEMultipart import MIMEMultipart + from email.MIMEText import MIMEText +except ImportError: + from io import StringIO + from email.mime.base import MIMEBase + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + +from email import encoders +from email.utils import formatdate, make_msgid +from email.generator import Generator + +from cps.services.worker import CalibreTask +from cps import logger, config + +from cps import gdriveutils + +log = logger.create() + +chunksize = 8192 + + +# Class for sending email with ability to get current progress +class EmailBase(): + + transferSize = 0 + progress = 0 + + def data(self, msg): + self.transferSize = len(msg) + (code, resp) = smtplib.SMTP.data(self, msg) + self.progress = 0 + return (code, resp) + + def send(self, strg): + """Send `strg' to the server.""" + log.debug('send: %r', strg[:300]) + if hasattr(self, 'sock') and self.sock: + try: + if self.transferSize: + lock=threading.Lock() + lock.acquire() + self.transferSize = len(strg) + lock.release() + for i in range(0, self.transferSize, chunksize): + if isinstance(strg, bytes): + self.sock.send((strg[i:i+chunksize])) + else: + self.sock.send((strg[i:i + chunksize]).encode('utf-8')) + lock.acquire() + self.progress = i + lock.release() + else: + self.sock.sendall(strg.encode('utf-8')) + except socket.error: + self.close() + raise smtplib.SMTPServerDisconnected('Server not connected') + else: + raise smtplib.SMTPServerDisconnected('please run connect() first') + + @classmethod + def _print_debug(self, *args): + log.debug(args) + + def getTransferStatus(self): + if self.transferSize: + lock2 = threading.Lock() + lock2.acquire() + value = int((float(self.progress) / float(self.transferSize))*100) + lock2.release() + return str(value) + ' %' + else: + return "100 %" + + +# Class for sending email with ability to get current progress, derived from emailbase class +class Email(EmailBase, smtplib.SMTP): + + def __init__(self, *args, **kwargs): + smtplib.SMTP.__init__(self, *args, **kwargs) + + +# Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class +class EmailSSL(EmailBase, smtplib.SMTP_SSL): + + def __init__(self, *args, **kwargs): + smtplib.SMTP_SSL.__init__(self, *args, **kwargs) + + +class TaskEmail(CalibreTask): + def __init__(self, subject, filepath, attachment, settings, recipient, taskMessage, text, internal=False): + super().__init__(taskMessage) + self.subject = subject + self.attachment = attachment + self.settings = settings + self.filepath = filepath + self.recipent = recipient + self.text = text + + self.results = dict() + + def run(self, worker_thread): + # create MIME message + msg = MIMEMultipart() + msg['Subject'] = self.subject + msg['Message-Id'] = make_msgid('calibre-web') + msg['Date'] = formatdate(localtime=True) + text = self.text + msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) + if self.attachment: + result = self.get_attachment(self.filepath, self.attachment) + if result: + msg.attach(result) + else: + self._handleError(u"Attachment not found") + return + + msg['From'] = self.settings["mail_from"] + msg['To'] = self.recipent + + use_ssl = int(self.settings.get('mail_use_ssl', 0)) + try: + # convert MIME message to string + fp = StringIO() + gen = Generator(fp, mangle_from_=False) + gen.flatten(msg) + msg = fp.getvalue() + + # send email + timeout = 600 # set timeout to 5mins + + # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten + # _print_debug function + if sys.version_info < (3, 0): + org_smtpstderr = smtplib.stderr + smtplib.stderr = logger.StderrLogger('worker.smtp') + + if use_ssl == 2: + self.asyncSMTP = EmailSSL(self.settings["mail_server"], self.settings["mail_port"], + timeout=timeout) + else: + self.asyncSMTP = Email(self.settings["mail_server"], self.settings["mail_port"], timeout=timeout) + + # link to logginglevel + if logger.is_debug_enabled(): + self.asyncSMTP.set_debuglevel(1) + if use_ssl == 1: + self.asyncSMTP.starttls() + if self.settings["mail_password"]: + self.asyncSMTP.login(str(self.settings["mail_login"]), str(self.settings["mail_password"])) + self.asyncSMTP.sendmail(self.settings["mail_from"], self.recipent, msg) + self.asyncSMTP.quit() + self._handleSuccess() + + if sys.version_info < (3, 0): + smtplib.stderr = org_smtpstderr + + except (MemoryError) as e: + log.exception(e) + self._handleError(u'MemoryError sending email: ' + str(e)) + return None + except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: + if hasattr(e, "smtp_error"): + text = e.smtp_error.decode('utf-8').replace("\n", '. ') + elif hasattr(e, "message"): + text = e.message + else: + log.exception(e) + text = '' + self._handleError(u'Smtplib Error sending email: ' + text) + return None + except (socket.error) as e: + self._handleError(u'Socket Error sending email: ' + e.strerror) + return None + + def _get_attachment(bookpath, filename): + """Get file as MIMEBase message""" + calibrepath = config.config_calibre_dir + if config.config_use_google_drive: + df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) + if df: + datafile = os.path.join(calibrepath, bookpath, filename) + if not os.path.exists(os.path.join(calibrepath, bookpath)): + os.makedirs(os.path.join(calibrepath, bookpath)) + df.GetContentFile(datafile) + else: + return None + file_ = open(datafile, 'rb') + data = file_.read() + file_.close() + os.remove(datafile) + else: + try: + file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') + data = file_.read() + file_.close() + except IOError as e: + log.exception(e) + log.error(u'The requested file could not be read. Maybe wrong permissions?') + return None + + attachment = MIMEBase('application', 'octet-stream') + attachment.set_payload(data) + encoders.encode_base64(attachment) + attachment.add_header('Content-Disposition', 'attachment', + filename=filename) + return attachment + + @property + def name(self): + return "Email" diff --git a/cps/tasks/upload.py b/cps/tasks/upload.py new file mode 100644 index 00000000..0e678b51 --- /dev/null +++ b/cps/tasks/upload.py @@ -0,0 +1,18 @@ +from __future__ import division, print_function, unicode_literals + +from datetime import datetime +from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS + +class TaskUpload(CalibreTask): + def __init__(self, taskMessage): + super().__init__(taskMessage) + self.start_time = self.end_time = datetime.now() + self.stat = STAT_FINISH_SUCCESS + + def run(self, worker_thread): + """Upload task doesn't have anything to do, it's simply a way to add information to the task list""" + pass + + @property + def name(self): + return "Upload" diff --git a/cps/web.py b/cps/web.py index 6064c1e9..e21068fa 100644 --- a/cps/web.py +++ b/cps/web.py @@ -41,6 +41,9 @@ from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.sql.expression import text, func, true, false, not_, and_, or_ from werkzeug.exceptions import default_exceptions, InternalServerError from sqlalchemy.sql.functions import coalesce + +from .services.worker import WorkerThread + try: from werkzeug.exceptions import FailedDependency except ImportError: @@ -48,7 +51,7 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, worker2, cli +from . import constants, logger, isoLanguages, services, worker, cli from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download @@ -383,7 +386,8 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = worker2._worker2.tasks + + tasks = WorkerThread.getInstance().tasks return jsonify(render_task_status(tasks)) From 414043ded178c75bef92d708df91e72a2982b208 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sat, 22 Aug 2020 23:35:48 -0400 Subject: [PATCH 03/18] Remove references to old worker, turn off calibre_db task queue (for now until I can determine if it's needed still), and attempt to re-implement email progress tracking (not working at the moment) --- cps/db.py | 2 +- cps/editbooks.py | 2 +- cps/helper.py | 7 ++----- cps/tasks/email.py | 29 +++++++++++++++++++---------- cps/tasks/upload.py | 1 + cps/web.py | 7 +++---- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cps/db.py b/cps/db.py index 95b5d366..a3075529 100644 --- a/cps/db.py +++ b/cps/db.py @@ -347,7 +347,7 @@ class CalibreDB(threading.Thread): self.log = logger.create() def run(self): - while True: + while False: i = self.queue.get() if i == 'dummy': self.queue.task_done() diff --git a/cps/editbooks.py b/cps/editbooks.py index 903b50e3..103f683c 100644 --- a/cps/editbooks.py +++ b/cps/editbooks.py @@ -33,7 +33,7 @@ from flask_login import current_user, login_required from sqlalchemy.exc import OperationalError from . import constants, logger, isoLanguages, gdriveutils, uploader, helper -from . import config, get_locale, ub, worker, db +from . import config, get_locale, ub, db from . import calibre_db from .services.worker import WorkerThread from .tasks.upload import TaskUpload diff --git a/cps/helper.py b/cps/helper.py index cd6133dd..3530d864 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -60,16 +60,13 @@ try: except ImportError: use_PIL = False -from . import logger, config, get_locale, db, ub, isoLanguages, worker +from . import logger, config, get_locale, db, ub, isoLanguages from . import gdriveutils as gd from .constants import STATIC_DIR as _STATIC_DIR from .pagination import Pagination from .subproc_wrapper import process_wait -from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS -from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY -from .services.worker import WorkerThread +from .services.worker import WorkerThread, STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS from .tasks.email import TaskEmail -from . import tasks log = logger.create() diff --git a/cps/tasks/email.py b/cps/tasks/email.py index fbce103e..7a85b56a 100644 --- a/cps/tasks/email.py +++ b/cps/tasks/email.py @@ -27,11 +27,11 @@ from cps import gdriveutils log = logger.create() -chunksize = 8192 +CHUNKSIZE = 8192 # Class for sending email with ability to get current progress -class EmailBase(): +class EmailBase: transferSize = 0 progress = 0 @@ -52,11 +52,11 @@ class EmailBase(): lock.acquire() self.transferSize = len(strg) lock.release() - for i in range(0, self.transferSize, chunksize): + for i in range(0, self.transferSize, CHUNKSIZE): if isinstance(strg, bytes): - self.sock.send((strg[i:i+chunksize])) + self.sock.send((strg[i:i + CHUNKSIZE])) else: - self.sock.send((strg[i:i + chunksize]).encode('utf-8')) + self.sock.send((strg[i:i + CHUNKSIZE]).encode('utf-8')) lock.acquire() self.progress = i lock.release() @@ -69,7 +69,7 @@ class EmailBase(): raise smtplib.SMTPServerDisconnected('please run connect() first') @classmethod - def _print_debug(self, *args): + def _print_debug(cls, *args): log.debug(args) def getTransferStatus(self): @@ -78,9 +78,9 @@ class EmailBase(): lock2.acquire() value = int((float(self.progress) / float(self.transferSize))*100) lock2.release() - return str(value) + ' %' + return value / 100 else: - return "100 %" + return 1 # Class for sending email with ability to get current progress, derived from emailbase class @@ -106,6 +106,7 @@ class TaskEmail(CalibreTask): self.filepath = filepath self.recipent = recipient self.text = text + self.asyncSMTP = None self.results = dict() @@ -118,7 +119,7 @@ class TaskEmail(CalibreTask): text = self.text msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) if self.attachment: - result = self.get_attachment(self.filepath, self.attachment) + result = self._get_attachment(self.filepath, self.attachment) if result: msg.attach(result) else: @@ -183,7 +184,15 @@ class TaskEmail(CalibreTask): self._handleError(u'Socket Error sending email: ' + e.strerror) return None - def _get_attachment(bookpath, filename): + @property + def progress(self): + if self.asyncSMTP is not None: + return self.asyncSMTP.getTransferStatus() + else: + return 0 + + @classmethod + def _get_attachment(cls, bookpath, filename): """Get file as MIMEBase message""" calibrepath = config.config_calibre_dir if config.config_use_google_drive: diff --git a/cps/tasks/upload.py b/cps/tasks/upload.py index 0e678b51..5f797c50 100644 --- a/cps/tasks/upload.py +++ b/cps/tasks/upload.py @@ -8,6 +8,7 @@ class TaskUpload(CalibreTask): super().__init__(taskMessage) self.start_time = self.end_time = datetime.now() self.stat = STAT_FINISH_SUCCESS + self.progress = 1 def run(self, worker_thread): """Upload task doesn't have anything to do, it's simply a way to add information to the task list""" diff --git a/cps/web.py b/cps/web.py index e21068fa..8f533d15 100644 --- a/cps/web.py +++ b/cps/web.py @@ -51,11 +51,11 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, cli +from . import constants, logger, isoLanguages, services from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download -from .helper import check_valid_domain, render_task_status, json_serial, \ +from .helper import check_valid_domain, render_task_status, \ get_cc_columns, get_book_cover, get_download_link, send_mail, generate_random_password, \ send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password from .pagination import Pagination @@ -386,7 +386,6 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = WorkerThread.getInstance().tasks return jsonify(render_task_status(tasks)) @@ -976,7 +975,7 @@ def category_list(): @login_required def get_tasks_status(): # if current user admin, show all email, otherwise only own emails - tasks = worker.get_taskstatus() + tasks = WorkerThread.getInstance().tasks answer = render_task_status(tasks) return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks") From 9ce2e8ea53ba2568373adfc8b1ebfa7b81282daf Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 12:58:24 -0400 Subject: [PATCH 04/18] Fix progress indication for emails that have been completed. --- cps/tasks/email.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cps/tasks/email.py b/cps/tasks/email.py index 7a85b56a..be951c6b 100644 --- a/cps/tasks/email.py +++ b/cps/tasks/email.py @@ -189,7 +189,15 @@ class TaskEmail(CalibreTask): if self.asyncSMTP is not None: return self.asyncSMTP.getTransferStatus() else: - return 0 + return self._progress + + @progress.setter + def progress(self, x): + """This gets explicitly set when handle(Success|Error) are called. In this case, remove the SMTP connection""" + if x == 1: + self.asyncSMTP = None + self._progress = x + @classmethod def _get_attachment(cls, bookpath, filename): From bf41b04cfa34efc209e307aaffb1f1a93729dac7 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 13:07:24 -0400 Subject: [PATCH 05/18] Remove convert task from db.py - with the fixed from #1565, this no longer seems to be needed --- cps/db.py | 12 ------------ cps/tasks/convert.py | 14 +++++++++++--- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/cps/db.py b/cps/db.py index a3075529..46e54706 100644 --- a/cps/db.py +++ b/cps/db.py @@ -352,20 +352,8 @@ class CalibreDB(threading.Thread): if i == 'dummy': self.queue.task_done() break - if i['task'] == 'add_format': - cur_book = self.session.query(Books).filter(Books.id == i['id']).first() - cur_book.data.append(i['format']) - try: - # db.session.merge(cur_book) - self.session.commit() - except OperationalError as e: - self.session.rollback() - self.log.error("Database error: %s", e) - # self._handleError(_(u"Database error: %(error)s.", error=e)) - # return self.queue.task_done() - def stop(self): self.queue.put('dummy') diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 58b45d33..cb6d9d81 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -6,6 +6,8 @@ import re from glob import glob from shutil import copyfile +from sqlalchemy.exc import SQLAlchemyError + from cps.services.worker import CalibreTask from cps import calibre_db, db from cps import logger, config @@ -84,9 +86,15 @@ class TaskConvert(CalibreTask): book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) # todo: this may not be needed anymore, might be able to access the DB directly now. See #1565 - task = {'task':'add_format','id': book_id, 'format': new_format} - self.worker_thread.db_queue.put(task) - # To Do how to handle error? + cur_book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first() + cur_book.data.append(new_format) + + try: + # db.session.merge(cur_book) + calibre_db.session.commit() + except SQLAlchemyError as e: + calibre_db.session.rollback() + log.error("Database error: %s", e) '''cur_book.data.append(new_format) try: From a000de02708cfd51c3d8951b58837fee96bc1839 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 13:33:57 -0400 Subject: [PATCH 06/18] Some clean up --- cps/tasks/convert.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index cb6d9d81..0039539a 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -85,8 +85,6 @@ class TaskConvert(CalibreTask): book_format=self.settings['new_book_format'].upper(), book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) - # todo: this may not be needed anymore, might be able to access the DB directly now. See #1565 - cur_book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first() cur_book.data.append(new_format) try: @@ -95,16 +93,7 @@ class TaskConvert(CalibreTask): except SQLAlchemyError as e: calibre_db.session.rollback() log.error("Database error: %s", e) - - '''cur_book.data.append(new_format) - try: - # db.session.merge(cur_book) - calibre_db.session.commit() - except OperationalError as e: - calibre_db.session.rollback() - log.error("Database error: %s", e) - self._handleError(_(u"Database error: %(error)s.", error=e)) - return''' + return self.results['path'] = cur_book.path self.results['title'] = cur_book.title From 59d56d5c83266cc154c43dde4b3c3fffacc5ed90 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 21:21:55 -0400 Subject: [PATCH 07/18] py27 support (I think) and some clean up --- cps/services/worker.py | 46 +++++------------------------------------- cps/tasks/convert.py | 22 +++++++++++--------- cps/tasks/email.py | 2 +- cps/tasks/upload.py | 2 +- 4 files changed, 20 insertions(+), 52 deletions(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index 61a4bae2..7056314c 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -1,42 +1,15 @@ from __future__ import division, print_function, unicode_literals -import sys -import os -import re -import smtplib -import socket -import time import threading + try: import queue except ImportError: import Queue as queue -from glob import glob -from shutil import copyfile from datetime import datetime -try: - from StringIO import StringIO - from email.MIMEBase import MIMEBase - from email.MIMEMultipart import MIMEMultipart - from email.MIMEText import MIMEText -except ImportError: - from io import StringIO - from email.mime.base import MIMEBase - from email.mime.multipart import MIMEMultipart - from email.mime.text import MIMEText - -from email import encoders -from email.utils import formatdate -from email.utils import make_msgid -from email.generator import Generator -from flask_babel import gettext as _ - -from cps import calibre_db, db -from cps import logger, config -from cps.subproc_wrapper import process_open -from cps import gdriveutils -from flask_babel import gettext as _ +from cps import calibre_db +from cps import logger import abc log = logger.create() @@ -86,9 +59,6 @@ class WorkerThread(threading.Thread): self.doLock = threading.Lock() self.queue = ImprovedQueue() - # todo: figure this stuff out and where it should goes - self.asyncSMTP = None - self.start() @classmethod @@ -121,13 +91,6 @@ class WorkerThread(threading.Thread): self.queue.task_done() - def get_send_status(self): - raise NotImplementedError - # if self.asyncSMTP: - # return self.asyncSMTP.getTransferStatus() - # else: - # return "0 %" - def _delete_completed_tasks(self): raise NotImplementedError() # for index, task in reversed(list(enumerate(self.UIqueue))): @@ -140,7 +103,8 @@ class WorkerThread(threading.Thread): # self.current -= 1 # self.last = len(self.queue) -class CalibreTask(metaclass=abc.ABCMeta): +class CalibreTask: + __metaclass__ = abc.ABCMeta def __init__(self, message): self._progress = 0 diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 0039539a..2b6c6db4 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -14,12 +14,14 @@ from cps import logger, config from cps.subproc_wrapper import process_open from flask_babel import gettext as _ +from cps.tasks.email import TaskEmail +from cps import gdriveutils log = logger.create() class TaskConvert(CalibreTask): def __init__(self, file_path, bookid, taskMessage, settings, kindle_mail): - super().__init__(taskMessage) + super(TaskConvert, self).__init__(taskMessage) self.file_path = file_path self.bookid = bookid self.settings = settings @@ -31,14 +33,16 @@ class TaskConvert(CalibreTask): self.worker_thread = worker_thread filename = self._convert_ebook_format() # todo: re-enable this (need to set up gdrive to test with) - # if filename: - # if config.config_use_google_drive: - # gdriveutils.updateGdriveCalibreFromLocal() - # if curr_task == TASK_CONVERT: - # self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], - # filename, self.queue[index]['settings'], self.queue[index]['kindle'], - # self.UIqueue[index]['user'], self.queue[index]['title'], - # self.queue[index]['settings']['body'], internal=True) + if filename: + if config.config_use_google_drive: + gdriveutils.updateGdriveCalibreFromLocal() + if self.kindle_mail: + # if we're sending to kindle after converting, create a one-off task and run it immediately + # todo: figure out how to incorporate this into the progress + task = TaskEmail(self.settings['subject'], self.results["path"], + filename, self.settings, self.kindle_mail, + self.settings['subject'], self.settings['body'], internal=True) + task.start() self._handleSuccess() pass diff --git a/cps/tasks/email.py b/cps/tasks/email.py index be951c6b..3b2db6eb 100644 --- a/cps/tasks/email.py +++ b/cps/tasks/email.py @@ -99,7 +99,7 @@ class EmailSSL(EmailBase, smtplib.SMTP_SSL): class TaskEmail(CalibreTask): def __init__(self, subject, filepath, attachment, settings, recipient, taskMessage, text, internal=False): - super().__init__(taskMessage) + super(TaskEmail, self).__init__(taskMessage) self.subject = subject self.attachment = attachment self.settings = settings diff --git a/cps/tasks/upload.py b/cps/tasks/upload.py index 5f797c50..ce2cb07b 100644 --- a/cps/tasks/upload.py +++ b/cps/tasks/upload.py @@ -5,7 +5,7 @@ from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS class TaskUpload(CalibreTask): def __init__(self, taskMessage): - super().__init__(taskMessage) + super(TaskUpload, self).__init__(taskMessage) self.start_time = self.end_time = datetime.now() self.stat = STAT_FINISH_SUCCESS self.progress = 1 From ac22483f9841966fa7a23fe196ee5925d201e827 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 21:35:05 -0400 Subject: [PATCH 08/18] Add error handling to the email task within the convert task --- cps/tasks/convert.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 2b6c6db4..381ae6b8 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -32,17 +32,20 @@ class TaskConvert(CalibreTask): def run(self, worker_thread): self.worker_thread = worker_thread filename = self._convert_ebook_format() - # todo: re-enable this (need to set up gdrive to test with) + if filename: if config.config_use_google_drive: gdriveutils.updateGdriveCalibreFromLocal() if self.kindle_mail: # if we're sending to kindle after converting, create a one-off task and run it immediately # todo: figure out how to incorporate this into the progress - task = TaskEmail(self.settings['subject'], self.results["path"], + try: + task = TaskEmail(self.settings['subject'], self.results["path"], filename, self.settings, self.kindle_mail, self.settings['subject'], self.settings['body'], internal=True) - task.start() + task.start() + except Exception as e: + return self._handleError(str(e)) self._handleSuccess() pass From bec280c6b1b0e8e0a3032c4460516b6fa52c8232 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 21:51:44 -0400 Subject: [PATCH 09/18] Add some error handling --- cps/services/worker.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index 7056314c..e4ccf988 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -84,10 +84,8 @@ class WorkerThread(threading.Thread): # sometimes tasks (like Upload) don't actually have work to do and are created as already finished if item.stat is STAT_WAITING: - try: - item.start(self) - except Exception as e: - log.exception(e) + # CalibreTask.start() should wrap all exceptions in it's own error handling + item.start(self) self.queue.task_done() @@ -127,7 +125,14 @@ class CalibreTask: def start(self, *args): self.start_time = datetime.now() self.stat = STAT_STARTED - self.run(*args) + + # catch any unhandled exceptions in a task and automatically fail it + try: + self.run(*args) + except Exception as e: + self._handleError(str(e)) + log.exception(e) + self.end_time = datetime.now() @property @@ -144,7 +149,8 @@ class CalibreTask: @progress.setter def progress(self, x): - # todo: throw error if outside of [0,1] + if not 0 <= x <= 1: + raise ValueError("Task progress should within [0, 1] range") self._progress = x @property From 508f49df18cef291ec576c96407444f1738b3e90 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 23:00:23 -0400 Subject: [PATCH 10/18] Remove completed tasks and sort the tasks by date added when calling `.tasks` --- cps/helper.py | 2 +- cps/services/worker.py | 51 ++++++++++++++++++++++-------------------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/cps/helper.py b/cps/helper.py index e3a0e5b1..2b2a0adb 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -723,7 +723,7 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for user, task in tasklist: + for user, added, task in tasklist: if user == current_user.nickname or current_user.role_admin(): ret = {} if task.start_time: diff --git a/cps/services/worker.py b/cps/services/worker.py index e4ccf988..bb3e3b07 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -1,16 +1,18 @@ from __future__ import division, print_function, unicode_literals import threading +import abc +import uuid try: import queue except ImportError: import Queue as queue from datetime import datetime +from collections import namedtuple from cps import calibre_db from cps import logger -import abc log = logger.create() @@ -20,6 +22,11 @@ STAT_FAIL = 1 STAT_STARTED = 2 STAT_FINISH_SUCCESS = 3 +# Only retain this many tasks in dequeued list +TASK_CLEANUP_TRIGGER = 20 + +QueuedTask = namedtuple('QueuedTask', 'user, added, task') + def _get_main_thread(): for t in threading.enumerate(): @@ -51,10 +58,7 @@ class WorkerThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) - self.finished = list() - - self.db_queue = queue.Queue() - calibre_db.add_queue(self.db_queue) + self.dequeued = list() self.doLock = threading.Lock() self.queue = ImprovedQueue() @@ -64,43 +68,41 @@ class WorkerThread(threading.Thread): @classmethod def add(cls, user, task): ins = cls.getInstance() - ins.queue.put((user, task)) + ins.queue.put(QueuedTask( + user=user, + added=datetime.now(), + task=task, + )) @property def tasks(self): with self.doLock: - tasks = list(self.queue.to_list()) + self.finished - return tasks # todo: order by data added + tasks = list(self.queue.to_list()) + self.dequeued + return sorted(tasks, key=lambda x: x.added) # Main thread loop starting the different tasks def run(self): main_thread = _get_main_thread() while main_thread.is_alive(): - user, item = self.queue.get() + item = self.queue.get() # add to list so that in-progress tasks show up with self.doLock: - self.finished.append((user, item)) + # Remove completed tasks if needed + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + # sort first (just to be certain), then lob off the extra + self.dequeued = sorted(self.dequeued, key=lambda x: x.added)[-1 * TASK_CLEANUP_TRIGGER:] + self.dequeued.append(item) + + user, added, task = item # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if item.stat is STAT_WAITING: + if task.stat is STAT_WAITING: # CalibreTask.start() should wrap all exceptions in it's own error handling - item.start(self) + task.start(self) self.queue.task_done() - def _delete_completed_tasks(self): - raise NotImplementedError() - # for index, task in reversed(list(enumerate(self.UIqueue))): - # if task['progress'] == "100 %": - # # delete tasks - # self.queue.pop(index) - # self.UIqueue.pop(index) - # # if we are deleting entries before the current index, adjust the index - # if index <= self.current and self.current: - # self.current -= 1 - # self.last = len(self.queue) - class CalibreTask: __metaclass__ = abc.ABCMeta @@ -111,6 +113,7 @@ class CalibreTask: self.start_time = None self.end_time = None self.message = message + self.id = uuid.uuid4() @abc.abstractmethod def run(self, worker_thread): From 0f28dc5e5506970ff92cff5f83f4bb12fa6d7e55 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 23:17:07 -0400 Subject: [PATCH 11/18] Remove the queue stuff for CalibreDB (is no longer used) --- cps/db.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/cps/db.py b/cps/db.py index 46e54706..7aba9b78 100644 --- a/cps/db.py +++ b/cps/db.py @@ -338,25 +338,9 @@ class CalibreDB(threading.Thread): threading.Thread.__init__(self) self.engine = None self.session = None - self.queue = None self.log = None self.config = None - def add_queue(self,queue): - self.queue = queue - self.log = logger.create() - - def run(self): - while False: - i = self.queue.get() - if i == 'dummy': - self.queue.task_done() - break - self.queue.task_done() - - def stop(self): - self.queue.put('dummy') - def setup_db(self, config, app_db_path): self.config = config self.dispose() From 04081f62c4400967cfafb8dd7cbb22743de38084 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Sun, 23 Aug 2020 23:34:41 -0400 Subject: [PATCH 12/18] Delete old worker thread class / logic --- cps/worker.py | 602 -------------------------------------------------- 1 file changed, 602 deletions(-) delete mode 100644 cps/worker.py diff --git a/cps/worker.py b/cps/worker.py deleted file mode 100644 index 6d17f470..00000000 --- a/cps/worker.py +++ /dev/null @@ -1,602 +0,0 @@ -# -*- coding: utf-8 -*- - -# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) -# Copyright (C) 2018-2019 OzzieIsaacs, bodybybuddha, janeczku -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from __future__ import division, print_function, unicode_literals -import sys -import os -import re -import smtplib -import socket -import time -import threading -try: - import queue -except ImportError: - import Queue as queue -from glob import glob -from shutil import copyfile -from datetime import datetime - -try: - from StringIO import StringIO - from email.MIMEBase import MIMEBase - from email.MIMEMultipart import MIMEMultipart - from email.MIMEText import MIMEText -except ImportError: - from io import StringIO - from email.mime.base import MIMEBase - from email.mime.multipart import MIMEMultipart - from email.mime.text import MIMEText - -from email import encoders -from email.utils import formatdate -from email.utils import make_msgid -from email.generator import Generator -from flask_babel import gettext as _ - -from . import calibre_db, db -from . import logger, config -from .subproc_wrapper import process_open -from . import gdriveutils - -log = logger.create() - -chunksize = 8192 -# task 'status' consts -STAT_WAITING = 0 -STAT_FAIL = 1 -STAT_STARTED = 2 -STAT_FINISH_SUCCESS = 3 -#taskType consts -TASK_EMAIL = 1 -TASK_CONVERT = 2 -TASK_UPLOAD = 3 -TASK_CONVERT_ANY = 4 - -RET_FAIL = 0 -RET_SUCCESS = 1 - - -def _get_main_thread(): - for t in threading.enumerate(): - if t.__class__.__name__ == '_MainThread': - return t - raise Exception("main thread not found?!") - - -# For gdrive download book from gdrive to calibredir (temp dir for books), read contents in both cases and append -# it in MIME Base64 encoded to -def get_attachment(bookpath, filename): - """Get file as MIMEBase message""" - calibrepath = config.config_calibre_dir - if config.config_use_google_drive: - df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) - if df: - datafile = os.path.join(calibrepath, bookpath, filename) - if not os.path.exists(os.path.join(calibrepath, bookpath)): - os.makedirs(os.path.join(calibrepath, bookpath)) - df.GetContentFile(datafile) - else: - return None - file_ = open(datafile, 'rb') - data = file_.read() - file_.close() - os.remove(datafile) - else: - try: - file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') - data = file_.read() - file_.close() - except IOError as e: - log.exception(e) - log.error(u'The requested file could not be read. Maybe wrong permissions?') - return None - - attachment = MIMEBase('application', 'octet-stream') - attachment.set_payload(data) - encoders.encode_base64(attachment) - attachment.add_header('Content-Disposition', 'attachment', - filename=filename) - return attachment - - -# Class for sending email with ability to get current progress -class emailbase(): - - transferSize = 0 - progress = 0 - - def data(self, msg): - self.transferSize = len(msg) - (code, resp) = smtplib.SMTP.data(self, msg) - self.progress = 0 - return (code, resp) - - def send(self, strg): - """Send `strg' to the server.""" - log.debug('send: %r', strg[:300]) - if hasattr(self, 'sock') and self.sock: - try: - if self.transferSize: - lock=threading.Lock() - lock.acquire() - self.transferSize = len(strg) - lock.release() - for i in range(0, self.transferSize, chunksize): - if isinstance(strg, bytes): - self.sock.send((strg[i:i+chunksize])) - else: - self.sock.send((strg[i:i + chunksize]).encode('utf-8')) - lock.acquire() - self.progress = i - lock.release() - else: - self.sock.sendall(strg.encode('utf-8')) - except socket.error: - self.close() - raise smtplib.SMTPServerDisconnected('Server not connected') - else: - raise smtplib.SMTPServerDisconnected('please run connect() first') - - @classmethod - def _print_debug(self, *args): - log.debug(args) - - def getTransferStatus(self): - if self.transferSize: - lock2 = threading.Lock() - lock2.acquire() - value = int((float(self.progress) / float(self.transferSize))*100) - lock2.release() - return str(value) + ' %' - else: - return "100 %" - - -# Class for sending email with ability to get current progress, derived from emailbase class -class email(emailbase, smtplib.SMTP): - - def __init__(self, *args, **kwargs): - smtplib.SMTP.__init__(self, *args, **kwargs) - - -# Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class -class email_SSL(emailbase, smtplib.SMTP_SSL): - - def __init__(self, *args, **kwargs): - smtplib.SMTP_SSL.__init__(self, *args, **kwargs) - - -#Class for all worker tasks in the background -class WorkerThread(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - self.status = 0 - self.current = 0 - self.last = 0 - self.queue = list() - self.UIqueue = list() - self.asyncSMTP = None - self.id = 0 - self.db_queue = queue.Queue() - calibre_db.add_queue(self.db_queue) - self.doLock = threading.Lock() - - # Main thread loop starting the different tasks - def run(self): - main_thread = _get_main_thread() - while main_thread.is_alive(): - try: - self.doLock.acquire() - if self.current != self.last: - index = self.current - log.info(index) - log.info(len(self.queue)) - self.doLock.release() - if self.queue[index]['taskType'] == TASK_EMAIL: - self._send_raw_email() - elif self.queue[index]['taskType'] in (TASK_CONVERT, TASK_CONVERT_ANY): - self._convert_any_format() - # TASK_UPLOAD is handled implicitly - self.doLock.acquire() - self.current += 1 - if self.current > self.last: - self.current = self.last - self.doLock.release() - else: - self.doLock.release() - except Exception as e: - log.exception(e) - self.doLock.release() - if main_thread.is_alive(): - time.sleep(1) - - def get_send_status(self): - if self.asyncSMTP: - return self.asyncSMTP.getTransferStatus() - else: - return "0 %" - - def _delete_completed_tasks(self): - for index, task in reversed(list(enumerate(self.UIqueue))): - if task['progress'] == "100 %": - # delete tasks - self.queue.pop(index) - self.UIqueue.pop(index) - # if we are deleting entries before the current index, adjust the index - if index <= self.current and self.current: - self.current -= 1 - self.last = len(self.queue) - - def get_taskstatus(self): - self.doLock.acquire() - if self.current < len(self.queue): - if self.UIqueue[self.current]['stat'] == STAT_STARTED: - if self.queue[self.current]['taskType'] == TASK_EMAIL: - self.UIqueue[self.current]['progress'] = self.get_send_status() - self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] - self.UIqueue[self.current]['rt'] = self.UIqueue[self.current]['formRuntime'].days*24*60 \ - + self.UIqueue[self.current]['formRuntime'].seconds \ - + self.UIqueue[self.current]['formRuntime'].microseconds - self.doLock.release() - return self.UIqueue - - def _convert_any_format(self): - # convert book, and upload in case of google drive - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_STARTED - self.queue[index]['starttime'] = datetime.now() - self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] - curr_task = self.queue[index]['taskType'] - filename = self._convert_ebook_format() - if filename: - if config.config_use_google_drive: - gdriveutils.updateGdriveCalibreFromLocal() - if curr_task == TASK_CONVERT: - self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], - filename, self.queue[index]['settings'], self.queue[index]['kindle'], - self.UIqueue[index]['user'], self.queue[index]['title'], - self.queue[index]['settings']['body'], internal=True) - - - def _convert_ebook_format(self): - error_message = None - self.doLock.acquire() - index = self.current - self.doLock.release() - file_path = self.queue[index]['file_path'] - book_id = self.queue[index]['bookid'] - format_old_ext = u'.' + self.queue[index]['settings']['old_book_format'].lower() - format_new_ext = u'.' + self.queue[index]['settings']['new_book_format'].lower() - - # check to see if destination format already exists - - # if it does - mark the conversion task as complete and return a success - # this will allow send to kindle workflow to continue to work - if os.path.isfile(file_path + format_new_ext): - log.info("Book id %d already converted to %s", book_id, format_new_ext) - cur_book = calibre_db.get_book(book_id) - self.queue[index]['path'] = file_path - self.queue[index]['title'] = cur_book.title - self._handleSuccess() - return file_path + format_new_ext - else: - log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", - book_id, - format_new_ext) - - if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': - check, error_message = self._convert_kepubify(file_path, - format_old_ext, - format_new_ext, - index) - else: - # check if calibre converter-executable is existing - if not os.path.exists(config.config_converterpath): - # ToDo Text is not translated - self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) - return - check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext, index) - - if check == 0: - cur_book = calibre_db.get_book(book_id) - if os.path.isfile(file_path + format_new_ext): - # self.db_queue.join() - new_format = db.Data(name=cur_book.data[0].name, - book_format=self.queue[index]['settings']['new_book_format'].upper(), - book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) - task = {'task':'add_format','id': book_id, 'format': new_format} - self.db_queue.put(task) - # To Do how to handle error? - - '''cur_book.data.append(new_format) - try: - # db.session.merge(cur_book) - calibre_db.session.commit() - except OperationalError as e: - calibre_db.session.rollback() - log.error("Database error: %s", e) - self._handleError(_(u"Database error: %(error)s.", error=e)) - return''' - - self.queue[index]['path'] = cur_book.path - self.queue[index]['title'] = cur_book.title - if config.config_use_google_drive: - os.remove(file_path + format_old_ext) - self._handleSuccess() - return file_path + format_new_ext - else: - error_message = format_new_ext.upper() + ' format not found on disk' - log.info("ebook converter failed with error while converting book") - if not error_message: - error_message = 'Ebook converter failed with unknown error' - self._handleError(error_message) - return - - - def _convert_calibre(self, file_path, format_old_ext, format_new_ext, index): - try: - # Linux py2.7 encode as list without quotes no empty element for parameters - # linux py3.x no encode and as list without quotes no empty element for parameters - # windows py2.7 encode as string with quotes empty element for parameters is okay - # windows py 3.x no encode and as string with quotes empty element for parameters is okay - # separate handling for windows and linux - quotes = [1, 2] - command = [config.config_converterpath, (file_path + format_old_ext), - (file_path + format_new_ext)] - quotes_index = 3 - if config.config_calibre: - parameters = config.config_calibre.split(" ") - for param in parameters: - command.append(param) - quotes.append(quotes_index) - quotes_index += 1 - - p = process_open(command, quotes) - except OSError as e: - return 1, _(u"Ebook-converter failed: %(error)s", error=e) - - while p.poll() is None: - nextline = p.stdout.readline() - if os.name == 'nt' and sys.version_info < (3, 0): - nextline = nextline.decode('windows-1252') - elif os.name == 'posix' and sys.version_info < (3, 0): - nextline = nextline.decode('utf-8') - log.debug(nextline.strip('\r\n')) - # parse progress string from calibre-converter - progress = re.search(r"(\d+)%\s.*", nextline) - if progress: - self.UIqueue[index]['progress'] = progress.group(1) + ' %' - - # process returncode - check = p.returncode - calibre_traceback = p.stderr.readlines() - error_message = "" - for ele in calibre_traceback: - if sys.version_info < (3, 0): - ele = ele.decode('utf-8') - log.debug(ele.strip('\n')) - if not ele.startswith('Traceback') and not ele.startswith(' File'): - error_message = "Calibre failed with error: %s" % ele.strip('\n') - return check, error_message - - - def _convert_kepubify(self, file_path, format_old_ext, format_new_ext, index): - quotes = [1, 3] - command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] - try: - p = process_open(command, quotes) - except OSError as e: - return 1, _(u"Kepubify-converter failed: %(error)s", error=e) - self.UIqueue[index]['progress'] = '1 %' - while True: - nextline = p.stdout.readlines() - nextline = [x.strip('\n') for x in nextline if x != '\n'] - if sys.version_info < (3, 0): - nextline = [x.decode('utf-8') for x in nextline] - for line in nextline: - log.debug(line) - if p.poll() is not None: - break - - # ToD Handle - # process returncode - check = p.returncode - - # move file - if check == 0: - converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) - if len(converted_file) == 1: - copyfile(converted_file[0], (file_path + format_new_ext)) - os.unlink(converted_file[0]) - else: - return 1, _(u"Converted file not found or more than one file in folder %(folder)s", - folder=os.path.dirname(file_path)) - return check, None - - - def add_convert(self, file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): - self.doLock.acquire() - if self.last >= 20: - self._delete_completed_tasks() - # progress, runtime, and status = 0 - self.id += 1 - task = TASK_CONVERT_ANY - if kindle_mail: - task = TASK_CONVERT - self.queue.append({'file_path':file_path, 'bookid':bookid, 'starttime': 0, 'kindle': kindle_mail, - 'taskType': task, 'settings':settings}) - self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': task } ) - - self.last=len(self.queue) - self.doLock.release() - - def add_email(self, subject, filepath, attachment, settings, recipient, user_name, taskMessage, - text, internal=False): - # if more than 20 entries in the list, clean the list - self.doLock.acquire() - if self.last >= 20: - self._delete_completed_tasks() - if internal: - self.current-= 1 - # progress, runtime, and status = 0 - self.id += 1 - self.queue.append({'subject':subject, 'attachment':attachment, 'filepath':filepath, - 'settings':settings, 'recipent':recipient, 'starttime': 0, - 'taskType': TASK_EMAIL, 'text':text}) - self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': TASK_EMAIL }) - self.last=len(self.queue) - self.doLock.release() - - def add_upload(self, user_name, taskMessage): - # if more than 20 entries in the list, clean the list - self.doLock.acquire() - - - if self.last >= 20: - self._delete_completed_tasks() - # progress=100%, runtime=0, and status finished - self.id += 1 - starttime = datetime.now() - self.queue.append({'starttime': starttime, 'taskType': TASK_UPLOAD}) - self.UIqueue.append({'user': user_name, 'formStarttime': starttime, 'progress': "100 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_FINISH_SUCCESS,'id': self.id, 'taskType': TASK_UPLOAD}) - self.last=len(self.queue) - self.doLock.release() - - def _send_raw_email(self): - self.doLock.acquire() - index = self.current - self.doLock.release() - self.queue[index]['starttime'] = datetime.now() - self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] - self.UIqueue[index]['stat'] = STAT_STARTED - obj=self.queue[index] - # create MIME message - msg = MIMEMultipart() - msg['Subject'] = self.queue[index]['subject'] - msg['Message-Id'] = make_msgid('calibre-web') - msg['Date'] = formatdate(localtime=True) - text = self.queue[index]['text'] - msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) - if obj['attachment']: - result = get_attachment(obj['filepath'], obj['attachment']) - if result: - msg.attach(result) - else: - self._handleError(u"Attachment not found") - return - - msg['From'] = obj['settings']["mail_from"] - msg['To'] = obj['recipent'] - - use_ssl = int(obj['settings'].get('mail_use_ssl', 0)) - try: - # convert MIME message to string - fp = StringIO() - gen = Generator(fp, mangle_from_=False) - gen.flatten(msg) - msg = fp.getvalue() - - # send email - timeout = 600 # set timeout to 5mins - - # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten - # _print_debug function - if sys.version_info < (3, 0): - org_smtpstderr = smtplib.stderr - smtplib.stderr = logger.StderrLogger('worker.smtp') - - if use_ssl == 2: - self.asyncSMTP = email_SSL(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) - else: - self.asyncSMTP = email(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) - - # link to logginglevel - if logger.is_debug_enabled(): - self.asyncSMTP.set_debuglevel(1) - if use_ssl == 1: - self.asyncSMTP.starttls() - if obj['settings']["mail_password"]: - self.asyncSMTP.login(str(obj['settings']["mail_login"]), str(obj['settings']["mail_password"])) - self.asyncSMTP.sendmail(obj['settings']["mail_from"], obj['recipent'], msg) - self.asyncSMTP.quit() - self._handleSuccess() - - if sys.version_info < (3, 0): - smtplib.stderr = org_smtpstderr - - except (MemoryError) as e: - log.exception(e) - self._handleError(u'MemoryError sending email: ' + str(e)) - return None - except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: - if hasattr(e, "smtp_error"): - text = e.smtp_error.decode('utf-8').replace("\n",'. ') - elif hasattr(e, "message"): - text = e.message - else: - log.exception(e) - text = '' - self._handleError(u'Smtplib Error sending email: ' + text) - return None - except (socket.error) as e: - self._handleError(u'Socket Error sending email: ' + e.strerror) - return None - - def _handleError(self, error_message): - log.error(error_message) - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_FAIL - self.UIqueue[index]['progress'] = "100 %" - self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] - self.UIqueue[index]['message'] = error_message - - def _handleSuccess(self): - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_FINISH_SUCCESS - self.UIqueue[index]['progress'] = "100 %" - self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] - - -def get_taskstatus(): - return _worker.get_taskstatus() - - -def add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text): - return _worker.add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text) - - -def add_upload(user_name, taskMessage): - return _worker.add_upload(user_name, taskMessage) - - -def add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): - return _worker.add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail) - - -_worker = WorkerThread() -_worker.start() From 8634b0c6f07ebc7d2da1f4be21bbd2f87f542aa2 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Mon, 24 Aug 2020 21:00:15 -0400 Subject: [PATCH 13/18] Remove left over placeholder code --- cps/tasks/convert.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 381ae6b8..56229fd1 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -47,9 +47,6 @@ class TaskConvert(CalibreTask): except Exception as e: return self._handleError(str(e)) - self._handleSuccess() - pass - def _convert_ebook_format(self): error_message = None file_path = self.file_path From 5ec1283bb10ccb618ad2ed92fbe5662ef93628d2 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Mon, 24 Aug 2020 21:03:59 -0400 Subject: [PATCH 14/18] Remove threading for the calibre DB class --- cps/__init__.py | 1 - cps/db.py | 5 +---- cps/server.py | 3 --- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/cps/__init__.py b/cps/__init__.py index d557649c..a879da0a 100644 --- a/cps/__init__.py +++ b/cps/__init__.py @@ -102,7 +102,6 @@ def create_app(): web_server.init_app(app, config) calibre_db.setup_db(config, cli.settingspath) - calibre_db.start() babel.init_app(app) _BABEL_TRANSLATIONS.update(str(item) for item in babel.list_translations()) diff --git a/cps/db.py b/cps/db.py index 7aba9b78..9eb8985d 100644 --- a/cps/db.py +++ b/cps/db.py @@ -24,14 +24,12 @@ import re import ast import json from datetime import datetime -import threading from sqlalchemy import create_engine from sqlalchemy import Table, Column, ForeignKey, CheckConstraint from sqlalchemy import String, Integer, Boolean, TIMESTAMP, Float from sqlalchemy.orm import relationship, sessionmaker, scoped_session from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.exc import OperationalError from sqlalchemy.pool import StaticPool from flask_login import current_user from sqlalchemy.sql.expression import and_, true, false, text, func, or_ @@ -332,10 +330,9 @@ class Custom_Columns(Base): return display_dict -class CalibreDB(threading.Thread): +class CalibreDB(): def __init__(self): - threading.Thread.__init__(self) self.engine = None self.session = None self.log = None diff --git a/cps/server.py b/cps/server.py index 7c2d321d..d27fe239 100644 --- a/cps/server.py +++ b/cps/server.py @@ -200,9 +200,6 @@ class WebServer(object): def stop(self, restart=False): from . import updater_thread updater_thread.stop() - from . import calibre_db - calibre_db.stop() - log.info("webserver stop (restart=%s)", restart) self.restart = restart From f3a3797850e04f3c3e6ab573828125995e5f2b11 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Mon, 24 Aug 2020 21:28:55 -0400 Subject: [PATCH 15/18] Fail the convert task if the email subtask fails --- cps/tasks/convert.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py index 56229fd1..5060c29e 100644 --- a/cps/tasks/convert.py +++ b/cps/tasks/convert.py @@ -8,7 +8,7 @@ from shutil import copyfile from sqlalchemy.exc import SQLAlchemyError -from cps.services.worker import CalibreTask +from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS from cps import calibre_db, db from cps import logger, config from cps.subproc_wrapper import process_open @@ -43,7 +43,11 @@ class TaskConvert(CalibreTask): task = TaskEmail(self.settings['subject'], self.results["path"], filename, self.settings, self.kindle_mail, self.settings['subject'], self.settings['body'], internal=True) - task.start() + task.start(worker_thread) + + # even though the convert task might be finished, if this task fails, fail the whole thing + if task.stat != STAT_FINISH_SUCCESS: + raise Exception(task.error) except Exception as e: return self._handleError(str(e)) From b81b8a1dea43401e0d1cd2b76a1f55e89888b993 Mon Sep 17 00:00:00 2001 From: blitzmann Date: Tue, 25 Aug 2020 00:05:20 -0400 Subject: [PATCH 16/18] Fix registration email --- cps/helper.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cps/helper.py b/cps/helper.py index 2b2a0adb..d11af912 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -128,9 +128,16 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False): text += "Don't forget to change your password after first login.\r\n" text += "Sincerely\r\n\r\n" text += "Your Calibre-Web team" - WorkerThread.add(None, TaskEmail(_(u'Get Started with Calibre-Web'), None, None, - config.get_mail_settings(), e_mail, None, - _(u"Registration e-mail for user: %(name)s", name=user_name), text)) + WorkerThread.add(None, TaskEmail( + subject=(u'Get Started with Calibre-Web'), + filepath=None, + attachment=None, + settings=config.get_mail_settings(), + recipient=e_mail, + taskMessage=_(u"Registration e-mail for user: %(name)s", name=user_name), + text=text + )) + return From 572ac4a17be8ad59f3d9c0e6bfc465b2dac9c88e Mon Sep 17 00:00:00 2001 From: blitzmann Date: Thu, 27 Aug 2020 21:44:28 -0400 Subject: [PATCH 17/18] Fix for deleting old, completed tasks (cherry picked from commit e066d7a4b0f47d3327696a11795cdc923ff0f6f3) --- cps/helper.py | 2 +- cps/services/worker.py | 51 ++++++++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/cps/helper.py b/cps/helper.py index 4f05c48a..624a3b54 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -730,7 +730,7 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for user, added, task in tasklist: + for num, user, added, task in tasklist: if user == current_user.nickname or current_user.role_admin(): ret = {} if task.start_time: diff --git a/cps/services/worker.py b/cps/services/worker.py index bb3e3b07..a22b9a71 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -8,7 +8,7 @@ try: import queue except ImportError: import Queue as queue -from datetime import datetime +from datetime import datetime, timedelta from collections import namedtuple from cps import calibre_db @@ -23,9 +23,9 @@ STAT_STARTED = 2 STAT_FINISH_SUCCESS = 3 # Only retain this many tasks in dequeued list -TASK_CLEANUP_TRIGGER = 20 +TASK_CLEANUP_TRIGGER = 5 -QueuedTask = namedtuple('QueuedTask', 'user, added, task') +QueuedTask = namedtuple('QueuedTask', 'num, user, added, task') def _get_main_thread(): @@ -62,13 +62,15 @@ class WorkerThread(threading.Thread): self.doLock = threading.Lock() self.queue = ImprovedQueue() - + self.num = 0 self.start() @classmethod def add(cls, user, task): ins = cls.getInstance() + ins.num += 1 ins.queue.put(QueuedTask( + num=ins.num, user=user, added=datetime.now(), task=task, @@ -77,8 +79,9 @@ class WorkerThread(threading.Thread): @property def tasks(self): with self.doLock: - tasks = list(self.queue.to_list()) + self.dequeued - return sorted(tasks, key=lambda x: x.added) + tasks = self.queue.to_list() + self.dequeued + return sorted(tasks, key=lambda x: x.num) + # Main thread loop starting the different tasks def run(self): @@ -86,23 +89,34 @@ class WorkerThread(threading.Thread): while main_thread.is_alive(): item = self.queue.get() - # add to list so that in-progress tasks show up with self.doLock: - # Remove completed tasks if needed + # once we hit our trigger, start cleaning up dead tasks if len(self.dequeued) > TASK_CLEANUP_TRIGGER: - # sort first (just to be certain), then lob off the extra - self.dequeued = sorted(self.dequeued, key=lambda x: x.added)[-1 * TASK_CLEANUP_TRIGGER:] + dead = [] + alive = [] + for x in self.dequeued: + (dead if x.task.dead else alive).append(x) + + # if the ones that we need to keep are within the trigger, do nothing else + delta = len(self.dequeued) - len(dead) + if delta > TASK_CLEANUP_TRIGGER: + ret = alive + else: + # otherwise, lop off the oldest dead tasks until we hit the target trigger + ret = sorted(dead, key=lambda x: x.task.end_time)[-TASK_CLEANUP_TRIGGER:] + alive + + self.dequeued = sorted(ret, key=lambda x: x.num) + # add to list so that in-progress tasks show up self.dequeued.append(item) - user, added, task = item - # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if task.stat is STAT_WAITING: + if item.task.stat is STAT_WAITING: # CalibreTask.start() should wrap all exceptions in it's own error handling - task.start(self) + item.task.start(self) self.queue.task_done() + class CalibreTask: __metaclass__ = abc.ABCMeta @@ -168,6 +182,15 @@ class CalibreTask: def runtime(self): return (self.end_time or datetime.now()) - self.start_time + @property + def dead(self): + """Determines whether or not this task can be garbage collected + + We have a separate dictating this because there may be certain tasks that want to override this + """ + # By default, we're good to clean a task if it's "Done" + return self.stat in (STAT_FINISH_SUCCESS, STAT_FAIL) + @progress.setter def progress(self, x): # todo: throw error if outside of [0,1] From 4cb82ea9bd736f41d0f3a4eee530d8d1b2cdd70d Mon Sep 17 00:00:00 2001 From: blitzmann Date: Thu, 27 Aug 2020 21:49:55 -0400 Subject: [PATCH 18/18] Revert testing condition that decreased number of tasks kept --- cps/services/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cps/services/worker.py b/cps/services/worker.py index a22b9a71..05307b52 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -23,7 +23,7 @@ STAT_STARTED = 2 STAT_FINISH_SUCCESS = 3 # Only retain this many tasks in dequeued list -TASK_CLEANUP_TRIGGER = 5 +TASK_CLEANUP_TRIGGER = 20 QueuedTask = namedtuple('QueuedTask', 'num, user, added, task')