Merge new worker thread
This commit is contained in:
		
						commit
						9e5cad0dc8
					
				|  | @ -102,7 +102,6 @@ def create_app(): | |||
| 
 | ||||
|     web_server.init_app(app, config) | ||||
|     calibre_db.setup_db(config, cli.settingspath) | ||||
|     calibre_db.start() | ||||
| 
 | ||||
|     babel.init_app(app) | ||||
|     _BABEL_TRANSLATIONS.update(str(item) for item in babel.list_translations()) | ||||
|  |  | |||
							
								
								
									
										33
									
								
								cps/db.py
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								cps/db.py
									
									
									
									
									
								
							|  | @ -24,7 +24,6 @@ import re | |||
| import ast | ||||
| import json | ||||
| from datetime import datetime | ||||
| import threading | ||||
| 
 | ||||
| from sqlalchemy import create_engine | ||||
| from sqlalchemy import Table, Column, ForeignKey, CheckConstraint | ||||
|  | @ -32,7 +31,6 @@ from sqlalchemy import String, Integer, Boolean, TIMESTAMP, Float | |||
| from sqlalchemy.orm import relationship, sessionmaker, scoped_session | ||||
| from sqlalchemy.orm.collections import InstrumentedList | ||||
| from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta | ||||
| from sqlalchemy.exc import OperationalError | ||||
| from sqlalchemy.pool import StaticPool | ||||
| from flask_login import current_user | ||||
| from sqlalchemy.sql.expression import and_, true, false, text, func, or_ | ||||
|  | @ -400,43 +398,14 @@ class AlchemyEncoder(json.JSONEncoder): | |||
|         return json.JSONEncoder.default(self, obj) | ||||
| 
 | ||||
| 
 | ||||
| class CalibreDB(threading.Thread): | ||||
| class CalibreDB(): | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         threading.Thread.__init__(self) | ||||
|         self.engine = None | ||||
|         self.session = None | ||||
|         self.queue = None | ||||
|         self.log = None | ||||
|         self.config = None | ||||
| 
 | ||||
|     def add_queue(self,queue): | ||||
|         self.queue = queue | ||||
|         self.log = logger.create() | ||||
| 
 | ||||
|     def run(self): | ||||
|         while True: | ||||
|             i = self.queue.get() | ||||
|             if i == 'dummy': | ||||
|                 self.queue.task_done() | ||||
|                 break | ||||
|             if i['task'] == 'add_format': | ||||
|                 cur_book = self.session.query(Books).filter(Books.id == i['id']).first() | ||||
|                 cur_book.data.append(i['format']) | ||||
|                 try: | ||||
|                     # db.session.merge(cur_book) | ||||
|                     self.session.commit() | ||||
|                 except OperationalError as e: | ||||
|                     self.session.rollback() | ||||
|                     self.log.error("Database error: %s", e) | ||||
|                     # self._handleError(_(u"Database error: %(error)s.", error=e)) | ||||
|                     # return | ||||
|             self.queue.task_done() | ||||
| 
 | ||||
| 
 | ||||
|     def stop(self): | ||||
|         self.queue.put('dummy') | ||||
| 
 | ||||
|     def setup_db(self, config, app_db_path): | ||||
|         self.config = config | ||||
|         self.dispose() | ||||
|  |  | |||
|  | @ -34,8 +34,10 @@ from flask_login import current_user, login_required | |||
| from sqlalchemy.exc import OperationalError | ||||
| 
 | ||||
| from . import constants, logger, isoLanguages, gdriveutils, uploader, helper | ||||
| from . import config, get_locale, ub, worker, db | ||||
| from . import config, get_locale, ub, db | ||||
| from . import calibre_db | ||||
| from .services.worker import WorkerThread | ||||
| from .tasks.upload import TaskUpload | ||||
| from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required | ||||
| 
 | ||||
| 
 | ||||
|  | @ -544,8 +546,8 @@ def upload_single_file(request, book, book_id): | |||
| 
 | ||||
|             # Queue uploader info | ||||
|             uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title) | ||||
|             worker.add_upload(current_user.nickname, | ||||
|                 "<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>") | ||||
|             WorkerThread.add(current_user.nickname, TaskUpload( | ||||
|                 "<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>")) | ||||
| 
 | ||||
|             return uploader.process( | ||||
|                 saved_filename, *os.path.splitext(requested_file.filename), | ||||
|  | @ -889,8 +891,8 @@ def upload(): | |||
|                 if error: | ||||
|                     flash(error, category="error") | ||||
|                 uploadText=_(u"File %(file)s uploaded", file=title) | ||||
|                 worker.add_upload(current_user.nickname, | ||||
|                     "<a href=\"" + url_for('web.show_book', book_id=book_id) + "\">" + uploadText + "</a>") | ||||
|                 WorkerThread.add(current_user.nickname, TaskUpload( | ||||
|                     "<a href=\"" + url_for('web.show_book', book_id=book_id) + "\">" + uploadText + "</a>")) | ||||
| 
 | ||||
|                 if len(request.files.getlist("btn-upload")) < 2: | ||||
|                     if current_user.role_edit() or current_user.role_admin(): | ||||
|  |  | |||
|  | @ -39,6 +39,7 @@ from sqlalchemy.sql.expression import true, false, and_, text, func | |||
| from werkzeug.datastructures import Headers | ||||
| from werkzeug.security import generate_password_hash | ||||
| from . import calibre_db | ||||
| from .tasks.convert import TaskConvert | ||||
| 
 | ||||
| try: | ||||
|     from urllib.parse import quote | ||||
|  | @ -58,12 +59,12 @@ try: | |||
| except ImportError: | ||||
|     use_PIL = False | ||||
| 
 | ||||
| from . import logger, config, get_locale, db, ub, worker | ||||
| from . import logger, config, get_locale, db, ub | ||||
| from . import gdriveutils as gd | ||||
| from .constants import STATIC_DIR as _STATIC_DIR | ||||
| from .subproc_wrapper import process_wait | ||||
| from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS | ||||
| from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY | ||||
| from .services.worker import WorkerThread, STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS | ||||
| from .tasks.email import TaskEmail | ||||
| 
 | ||||
| 
 | ||||
| log = logger.create() | ||||
|  | @ -101,7 +102,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, | |||
|         txt = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title)) | ||||
|         settings['old_book_format'] = old_book_format | ||||
|         settings['new_book_format'] = new_book_format | ||||
|         worker.add_convert(file_path, book.id, user_id, txt, settings, kindle_mail) | ||||
|         WorkerThread.add(user_id, TaskConvert(file_path, book.id, txt, settings, kindle_mail)) | ||||
|         return None | ||||
|     else: | ||||
|         error_message = _(u"%(format)s not found: %(fn)s", | ||||
|  | @ -110,9 +111,9 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, | |||
| 
 | ||||
| 
 | ||||
| def send_test_mail(kindle_mail, user_name): | ||||
|     worker.add_email(_(u'Calibre-Web test e-mail'), None, None, | ||||
|                      config.get_mail_settings(), kindle_mail, user_name, | ||||
|                      _(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.')) | ||||
|     WorkerThread.add(user_name, TaskEmail(_(u'Calibre-Web test e-mail'), None, None, | ||||
|                      config.get_mail_settings(), kindle_mail, _(u"Test e-mail"), | ||||
|                                _(u'This e-mail has been sent via Calibre-Web.'))) | ||||
|     return | ||||
| 
 | ||||
| 
 | ||||
|  | @ -127,9 +128,16 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False): | |||
|     text += "Don't forget to change your password after first login.\r\n" | ||||
|     text += "Sincerely\r\n\r\n" | ||||
|     text += "Your Calibre-Web team" | ||||
|     worker.add_email(_(u'Get Started with Calibre-Web'), None, None, | ||||
|                      config.get_mail_settings(), e_mail, None, | ||||
|                      _(u"Registration e-mail for user: %(name)s", name=user_name), text) | ||||
|     WorkerThread.add(None, TaskEmail( | ||||
|         subject=_(u'Get Started with Calibre-Web'), | ||||
|         filepath=None, | ||||
|         attachment=None, | ||||
|         settings=config.get_mail_settings(), | ||||
|         recipient=e_mail, | ||||
|         taskMessage=_(u"Registration e-mail for user: %(name)s", name=user_name), | ||||
|         text=text | ||||
|     )) | ||||
| 
 | ||||
|     return | ||||
| 
 | ||||
| 
 | ||||
|  | @ -221,9 +229,9 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id): | |||
|     for entry in iter(book.data): | ||||
|         if entry.format.upper() == book_format.upper(): | ||||
|             converted_file_name = entry.name + '.' + book_format.lower() | ||||
|             worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name, | ||||
|                              config.get_mail_settings(), kindle_mail, user_id, | ||||
|                              _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.')) | ||||
|             WorkerThread.add(user_id, TaskEmail(_(u"Send to Kindle"), book.path, converted_file_name, | ||||
|                              config.get_mail_settings(), kindle_mail, | ||||
|                              _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))) | ||||
|             return | ||||
|     return _(u"The requested file could not be read. Maybe wrong permissions?") | ||||
| 
 | ||||
|  | @ -722,47 +730,30 @@ def format_runtime(runtime): | |||
| # helper function to apply localize status information in tasklist entries | ||||
| def render_task_status(tasklist): | ||||
|     renderedtasklist = list() | ||||
|     for task in tasklist: | ||||
|         if task['user'] == current_user.nickname or current_user.role_admin(): | ||||
|             if task['formStarttime']: | ||||
|                 task['starttime'] = format_datetime(task['formStarttime'], format='short', locale=get_locale()) | ||||
|             # task2['formStarttime'] = "" | ||||
|             else: | ||||
|                 if 'starttime' not in task: | ||||
|                     task['starttime'] = "" | ||||
| 
 | ||||
|             if 'formRuntime' not in task: | ||||
|                 task['runtime'] = "" | ||||
|             else: | ||||
|                 task['runtime'] = format_runtime(task['formRuntime']) | ||||
|     for num, user, added, task in tasklist: | ||||
|         if user == current_user.nickname or current_user.role_admin(): | ||||
|             ret = {} | ||||
|             if task.start_time: | ||||
|                 ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale()) | ||||
|                 ret['runtime'] = format_runtime(task.runtime) | ||||
| 
 | ||||
|             # localize the task status | ||||
|             if isinstance(task['stat'], int): | ||||
|                 if task['stat'] == STAT_WAITING: | ||||
|                     task['status'] = _(u'Waiting') | ||||
|                 elif task['stat'] == STAT_FAIL: | ||||
|                     task['status'] = _(u'Failed') | ||||
|                 elif task['stat'] == STAT_STARTED: | ||||
|                     task['status'] = _(u'Started') | ||||
|                 elif task['stat'] == STAT_FINISH_SUCCESS: | ||||
|                     task['status'] = _(u'Finished') | ||||
|             if isinstance(task.stat, int): | ||||
|                 if task.stat == STAT_WAITING: | ||||
|                     ret['status'] = _(u'Waiting') | ||||
|                 elif task.stat == STAT_FAIL: | ||||
|                     ret['status'] = _(u'Failed') | ||||
|                 elif task.stat == STAT_STARTED: | ||||
|                     ret['status'] = _(u'Started') | ||||
|                 elif task.stat == STAT_FINISH_SUCCESS: | ||||
|                     ret['status'] = _(u'Finished') | ||||
|                 else: | ||||
|                     task['status'] = _(u'Unknown Status') | ||||
|                     ret['status'] = _(u'Unknown Status') | ||||
| 
 | ||||
|             # localize the task type | ||||
|             if isinstance(task['taskType'], int): | ||||
|                 if task['taskType'] == TASK_EMAIL: | ||||
|                     task['taskMessage'] = _(u'E-mail: ') + task['taskMess'] | ||||
|                 elif task['taskType'] == TASK_CONVERT: | ||||
|                     task['taskMessage'] = _(u'Convert: ') + task['taskMess'] | ||||
|                 elif task['taskType'] == TASK_UPLOAD: | ||||
|                     task['taskMessage'] = _(u'Upload: ') + task['taskMess'] | ||||
|                 elif task['taskType'] == TASK_CONVERT_ANY: | ||||
|                     task['taskMessage'] = _(u'Convert: ') + task['taskMess'] | ||||
|                 else: | ||||
|                     task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess'] | ||||
| 
 | ||||
|             renderedtasklist.append(task) | ||||
|             ret['taskMessage'] = "{}: {}".format(_(task.name), task.message) | ||||
|             ret['progress'] = "{} %".format(int(task.progress * 100)) | ||||
|             ret['user'] = user | ||||
|             renderedtasklist.append(ret) | ||||
| 
 | ||||
|     return renderedtasklist | ||||
| 
 | ||||
|  |  | |||
|  | @ -200,9 +200,6 @@ class WebServer(object): | |||
|     def stop(self, restart=False): | ||||
|         from . import updater_thread | ||||
|         updater_thread.stop() | ||||
|         from . import calibre_db | ||||
|         calibre_db.stop() | ||||
| 
 | ||||
| 
 | ||||
|         log.info("webserver stop (restart=%s)", restart) | ||||
|         self.restart = restart | ||||
|  |  | |||
							
								
								
									
										207
									
								
								cps/services/worker.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										207
									
								
								cps/services/worker.py
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,207 @@ | |||
| 
 | ||||
| from __future__ import division, print_function, unicode_literals | ||||
| import threading | ||||
| import abc | ||||
| import uuid | ||||
| 
 | ||||
| try: | ||||
|     import queue | ||||
| except ImportError: | ||||
|     import Queue as queue | ||||
| from datetime import datetime, timedelta | ||||
| from collections import namedtuple | ||||
| 
 | ||||
| from cps import calibre_db | ||||
| from cps import logger | ||||
| 
 | ||||
| log = logger.create() | ||||
| 
 | ||||
| # task 'status' consts | ||||
| STAT_WAITING = 0 | ||||
| STAT_FAIL = 1 | ||||
| STAT_STARTED = 2 | ||||
| STAT_FINISH_SUCCESS = 3 | ||||
| 
 | ||||
| # Only retain this many tasks in dequeued list | ||||
| TASK_CLEANUP_TRIGGER = 20 | ||||
| 
 | ||||
| QueuedTask = namedtuple('QueuedTask', 'num, user, added, task') | ||||
| 
 | ||||
| 
 | ||||
| def _get_main_thread(): | ||||
|     for t in threading.enumerate(): | ||||
|         if t.__class__.__name__ == '_MainThread': | ||||
|             return t | ||||
|     raise Exception("main thread not found?!") | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| class ImprovedQueue(queue.Queue): | ||||
|     def to_list(self): | ||||
|         """ | ||||
|         Returns a copy of all items in the queue without removing them. | ||||
|         """ | ||||
| 
 | ||||
|         with self.mutex: | ||||
|             return list(self.queue) | ||||
| 
 | ||||
| #Class for all worker tasks in the background | ||||
| class WorkerThread(threading.Thread): | ||||
|     _instance = None | ||||
| 
 | ||||
|     @classmethod | ||||
|     def getInstance(cls): | ||||
|         if cls._instance is None: | ||||
|             cls._instance = WorkerThread() | ||||
|         return cls._instance | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         threading.Thread.__init__(self) | ||||
| 
 | ||||
|         self.dequeued = list() | ||||
| 
 | ||||
|         self.doLock = threading.Lock() | ||||
|         self.queue = ImprovedQueue() | ||||
|         self.num = 0 | ||||
|         self.start() | ||||
| 
 | ||||
|     @classmethod | ||||
|     def add(cls, user, task): | ||||
|         ins = cls.getInstance() | ||||
|         ins.num += 1 | ||||
|         ins.queue.put(QueuedTask( | ||||
|             num=ins.num, | ||||
|             user=user, | ||||
|             added=datetime.now(), | ||||
|             task=task, | ||||
|         )) | ||||
| 
 | ||||
|     @property | ||||
|     def tasks(self): | ||||
|         with self.doLock: | ||||
|             tasks = self.queue.to_list() + self.dequeued | ||||
|             return sorted(tasks, key=lambda x: x.num) | ||||
| 
 | ||||
| 
 | ||||
|     # Main thread loop starting the different tasks | ||||
|     def run(self): | ||||
|         main_thread = _get_main_thread() | ||||
|         while main_thread.is_alive(): | ||||
|             item = self.queue.get() | ||||
| 
 | ||||
|             with self.doLock: | ||||
|                 # once we hit our trigger, start cleaning up dead tasks | ||||
|                 if len(self.dequeued) > TASK_CLEANUP_TRIGGER: | ||||
|                     dead = [] | ||||
|                     alive = [] | ||||
|                     for x in self.dequeued: | ||||
|                         (dead if x.task.dead else alive).append(x) | ||||
| 
 | ||||
|                     # if the ones that we need to keep are within the trigger, do nothing else | ||||
|                     delta = len(self.dequeued) - len(dead) | ||||
|                     if delta > TASK_CLEANUP_TRIGGER: | ||||
|                         ret = alive | ||||
|                     else: | ||||
|                         # otherwise, lop off the oldest dead tasks until we hit the target trigger | ||||
|                         ret = sorted(dead, key=lambda x: x.task.end_time)[-TASK_CLEANUP_TRIGGER:] + alive | ||||
| 
 | ||||
|                     self.dequeued = sorted(ret, key=lambda x: x.num) | ||||
|                 # add to list so that in-progress tasks show up | ||||
|                 self.dequeued.append(item) | ||||
| 
 | ||||
|             # sometimes tasks (like Upload) don't actually have work to do and are created as already finished | ||||
|             if item.task.stat is STAT_WAITING: | ||||
|                 # CalibreTask.start() should wrap all exceptions in it's own error handling | ||||
|                 item.task.start(self) | ||||
| 
 | ||||
|             self.queue.task_done() | ||||
| 
 | ||||
| 
 | ||||
| class CalibreTask: | ||||
|     __metaclass__ = abc.ABCMeta | ||||
| 
 | ||||
|     def __init__(self, message): | ||||
|         self._progress = 0 | ||||
|         self.stat = STAT_WAITING | ||||
|         self.error = None | ||||
|         self.start_time = None | ||||
|         self.end_time = None | ||||
|         self.message = message | ||||
|         self.id = uuid.uuid4() | ||||
| 
 | ||||
|     @abc.abstractmethod | ||||
|     def run(self, worker_thread): | ||||
|         """Provides the caller some human-readable name for this class""" | ||||
|         raise NotImplementedError | ||||
| 
 | ||||
|     @abc.abstractmethod | ||||
|     def name(self): | ||||
|         """Provides the caller some human-readable name for this class""" | ||||
|         raise NotImplementedError | ||||
| 
 | ||||
|     def start(self, *args): | ||||
|         self.start_time = datetime.now() | ||||
|         self.stat = STAT_STARTED | ||||
| 
 | ||||
|         # catch any unhandled exceptions in a task and automatically fail it | ||||
|         try: | ||||
|             self.run(*args) | ||||
|         except Exception as e: | ||||
|             self._handleError(str(e)) | ||||
|             log.exception(e) | ||||
| 
 | ||||
|         self.end_time = datetime.now() | ||||
| 
 | ||||
|     @property | ||||
|     def stat(self): | ||||
|         return self._stat | ||||
| 
 | ||||
|     @stat.setter | ||||
|     def stat(self, x): | ||||
|         self._stat = x | ||||
| 
 | ||||
|     @property | ||||
|     def progress(self): | ||||
|         return self._progress | ||||
| 
 | ||||
|     @progress.setter | ||||
|     def progress(self, x): | ||||
|         if not 0 <= x <= 1: | ||||
|             raise ValueError("Task progress should within [0, 1] range") | ||||
|         self._progress = x | ||||
| 
 | ||||
|     @property | ||||
|     def error(self): | ||||
|         return self._error | ||||
| 
 | ||||
|     @error.setter | ||||
|     def error(self, x): | ||||
|         self._error = x | ||||
| 
 | ||||
|     @property | ||||
|     def runtime(self): | ||||
|         return (self.end_time or datetime.now()) - self.start_time | ||||
| 
 | ||||
|     @property | ||||
|     def dead(self): | ||||
|         """Determines whether or not this task can be garbage collected | ||||
| 
 | ||||
|         We have a separate dictating this because there may be certain tasks that want to override this | ||||
|         """ | ||||
|         # By default, we're good to clean a task if it's "Done" | ||||
|         return self.stat in (STAT_FINISH_SUCCESS, STAT_FAIL) | ||||
| 
 | ||||
|     @progress.setter | ||||
|     def progress(self, x): | ||||
|         # todo: throw error if outside of [0,1] | ||||
|         self._progress = x | ||||
| 
 | ||||
|     def _handleError(self, error_message): | ||||
|         log.error(error_message) | ||||
|         self.stat = STAT_FAIL | ||||
|         self.progress = 1 | ||||
|         self.error = error_message | ||||
| 
 | ||||
|     def _handleSuccess(self): | ||||
|         self.stat = STAT_FINISH_SUCCESS | ||||
|         self.progress = 1 | ||||
							
								
								
									
										0
									
								
								cps/tasks/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								cps/tasks/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										201
									
								
								cps/tasks/convert.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										201
									
								
								cps/tasks/convert.py
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,201 @@ | |||
| from __future__ import division, print_function, unicode_literals | ||||
| import sys | ||||
| import os | ||||
| import re | ||||
| 
 | ||||
| from glob import glob | ||||
| from shutil import copyfile | ||||
| 
 | ||||
| from sqlalchemy.exc import SQLAlchemyError | ||||
| 
 | ||||
| from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS | ||||
| from cps import calibre_db, db | ||||
| from cps import logger, config | ||||
| from cps.subproc_wrapper import process_open | ||||
| from flask_babel import gettext as _ | ||||
| 
 | ||||
| from cps.tasks.email import TaskEmail | ||||
| from cps import gdriveutils | ||||
| log = logger.create() | ||||
| 
 | ||||
| 
 | ||||
| class TaskConvert(CalibreTask): | ||||
|     def __init__(self, file_path, bookid, taskMessage, settings, kindle_mail): | ||||
|         super(TaskConvert, self).__init__(taskMessage) | ||||
|         self.file_path = file_path | ||||
|         self.bookid = bookid | ||||
|         self.settings = settings | ||||
|         self.kindle_mail = kindle_mail | ||||
| 
 | ||||
|         self.results = dict() | ||||
| 
 | ||||
|     def run(self, worker_thread): | ||||
|         self.worker_thread = worker_thread | ||||
|         filename = self._convert_ebook_format() | ||||
| 
 | ||||
|         if filename: | ||||
|             if config.config_use_google_drive: | ||||
|                 gdriveutils.updateGdriveCalibreFromLocal() | ||||
|             if self.kindle_mail: | ||||
|                 # if we're sending to kindle after converting, create a one-off task and run it immediately | ||||
|                 # todo: figure out how to incorporate this into the progress | ||||
|                 try: | ||||
|                     task = TaskEmail(self.settings['subject'], self.results["path"], | ||||
|                                filename, self.settings, self.kindle_mail, | ||||
|                                self.settings['subject'], self.settings['body'], internal=True) | ||||
|                     task.start(worker_thread) | ||||
| 
 | ||||
|                     # even though the convert task might be finished, if this task fails, fail the whole thing | ||||
|                     if task.stat != STAT_FINISH_SUCCESS: | ||||
|                         raise Exception(task.error) | ||||
|                 except Exception as e: | ||||
|                     return self._handleError(str(e)) | ||||
| 
 | ||||
|     def _convert_ebook_format(self): | ||||
|         error_message = None | ||||
|         file_path = self.file_path | ||||
|         book_id = self.bookid | ||||
|         format_old_ext = u'.' + self.settings['old_book_format'].lower() | ||||
|         format_new_ext = u'.' + self.settings['new_book_format'].lower() | ||||
| 
 | ||||
|         # check to see if destination format already exists - | ||||
|         # if it does - mark the conversion task as complete and return a success | ||||
|         # this will allow send to kindle workflow to continue to work | ||||
|         if os.path.isfile(file_path + format_new_ext): | ||||
|             log.info("Book id %d already converted to %s", book_id, format_new_ext) | ||||
|             cur_book = calibre_db.get_book(book_id) | ||||
|             self.results['path'] = file_path | ||||
|             self.results['title'] = cur_book.title | ||||
|             self._handleSuccess() | ||||
|             return file_path + format_new_ext | ||||
|         else: | ||||
|             log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", | ||||
|                      book_id, | ||||
|                      format_new_ext) | ||||
| 
 | ||||
|         if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': | ||||
|             check, error_message = self._convert_kepubify(file_path, | ||||
|                                                           format_old_ext, | ||||
|                                                           format_new_ext) | ||||
|         else: | ||||
|             # check if calibre converter-executable is existing | ||||
|             if not os.path.exists(config.config_converterpath): | ||||
|                 # ToDo Text is not translated | ||||
|                 self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) | ||||
|                 return | ||||
|             check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext) | ||||
| 
 | ||||
|         if check == 0: | ||||
|             cur_book = calibre_db.get_book(book_id) | ||||
|             if os.path.isfile(file_path + format_new_ext): | ||||
|                 # self.db_queue.join() | ||||
|                 new_format = db.Data(name=cur_book.data[0].name, | ||||
|                                          book_format=self.settings['new_book_format'].upper(), | ||||
|                                          book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) | ||||
| 
 | ||||
|                 cur_book.data.append(new_format) | ||||
| 
 | ||||
|                 try: | ||||
|                     # db.session.merge(cur_book) | ||||
|                     calibre_db.session.commit() | ||||
|                 except SQLAlchemyError as e: | ||||
|                     calibre_db.session.rollback() | ||||
|                     log.error("Database error: %s", e) | ||||
|                     return | ||||
| 
 | ||||
|                 self.results['path'] = cur_book.path | ||||
|                 self.results['title'] = cur_book.title | ||||
|                 if config.config_use_google_drive: | ||||
|                     os.remove(file_path + format_old_ext) | ||||
|                 self._handleSuccess() | ||||
|                 return file_path + format_new_ext | ||||
|             else: | ||||
|                 error_message = _('%(format)s format not found on disk', format=format_new_ext.upper()) | ||||
|         log.info("ebook converter failed with error while converting book") | ||||
|         if not error_message: | ||||
|             error_message = _('Ebook converter failed with unknown error') | ||||
|         self._handleError(error_message) | ||||
|         return | ||||
| 
 | ||||
|     def _convert_kepubify(self, file_path, format_old_ext, format_new_ext): | ||||
|         quotes = [1, 3] | ||||
|         command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] | ||||
|         try: | ||||
|             p = process_open(command, quotes) | ||||
|         except OSError as e: | ||||
|             return 1, _(u"Kepubify-converter failed: %(error)s", error=e) | ||||
|         self.progress = 0.01 | ||||
|         while True: | ||||
|             nextline = p.stdout.readlines() | ||||
|             nextline = [x.strip('\n') for x in nextline if x != '\n'] | ||||
|             if sys.version_info < (3, 0): | ||||
|                 nextline = [x.decode('utf-8') for x in nextline] | ||||
|             for line in nextline: | ||||
|                 log.debug(line) | ||||
|             if p.poll() is not None: | ||||
|                 break | ||||
| 
 | ||||
|         # ToD Handle | ||||
|         # process returncode | ||||
|         check = p.returncode | ||||
| 
 | ||||
|         # move file | ||||
|         if check == 0: | ||||
|             converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) | ||||
|             if len(converted_file) == 1: | ||||
|                 copyfile(converted_file[0], (file_path + format_new_ext)) | ||||
|                 os.unlink(converted_file[0]) | ||||
|             else: | ||||
|                 return 1, _(u"Converted file not found or more than one file in folder %(folder)s", | ||||
|                             folder=os.path.dirname(file_path)) | ||||
|         return check, None | ||||
| 
 | ||||
|     def _convert_calibre(self, file_path, format_old_ext, format_new_ext): | ||||
|         try: | ||||
|             # Linux py2.7 encode as list without quotes no empty element for parameters | ||||
|             # linux py3.x no encode and as list without quotes no empty element for parameters | ||||
|             # windows py2.7 encode as string with quotes empty element for parameters is okay | ||||
|             # windows py 3.x no encode and as string with quotes empty element for parameters is okay | ||||
|             # separate handling for windows and linux | ||||
|             quotes = [1, 2] | ||||
|             command = [config.config_converterpath, (file_path + format_old_ext), | ||||
|                        (file_path + format_new_ext)] | ||||
|             quotes_index = 3 | ||||
|             if config.config_calibre: | ||||
|                 parameters = config.config_calibre.split(" ") | ||||
|                 for param in parameters: | ||||
|                     command.append(param) | ||||
|                     quotes.append(quotes_index) | ||||
|                     quotes_index += 1 | ||||
| 
 | ||||
|             p = process_open(command, quotes) | ||||
|         except OSError as e: | ||||
|             return 1, _(u"Ebook-converter failed: %(error)s", error=e) | ||||
| 
 | ||||
|         while p.poll() is None: | ||||
|             nextline = p.stdout.readline() | ||||
|             if os.name == 'nt' and sys.version_info < (3, 0): | ||||
|                 nextline = nextline.decode('windows-1252') | ||||
|             elif os.name == 'posix' and sys.version_info < (3, 0): | ||||
|                 nextline = nextline.decode('utf-8') | ||||
|             log.debug(nextline.strip('\r\n')) | ||||
|             # parse progress string from calibre-converter | ||||
|             progress = re.search(r"(\d+)%\s.*", nextline) | ||||
|             if progress: | ||||
|                 self.progress = int(progress.group(1)) / 100 | ||||
| 
 | ||||
|         # process returncode | ||||
|         check = p.returncode | ||||
|         calibre_traceback = p.stderr.readlines() | ||||
|         error_message = "" | ||||
|         for ele in calibre_traceback: | ||||
|             if sys.version_info < (3, 0): | ||||
|                 ele = ele.decode('utf-8') | ||||
|             log.debug(ele.strip('\n')) | ||||
|             if not ele.startswith('Traceback') and not ele.startswith('  File'): | ||||
|                 error_message = _("Calibre failed with error: %(error)s", ele.strip('\n')) | ||||
|         return check, error_message | ||||
| 
 | ||||
|     @property | ||||
|     def name(self): | ||||
|         return "Convert" | ||||
							
								
								
									
										238
									
								
								cps/tasks/email.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										238
									
								
								cps/tasks/email.py
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,238 @@ | |||
| from __future__ import division, print_function, unicode_literals | ||||
| import sys | ||||
| import os | ||||
| import smtplib | ||||
| import threading | ||||
| import socket | ||||
| 
 | ||||
| try: | ||||
|     from StringIO import StringIO | ||||
|     from email.MIMEBase import MIMEBase | ||||
|     from email.MIMEMultipart import MIMEMultipart | ||||
|     from email.MIMEText import MIMEText | ||||
| except ImportError: | ||||
|     from io import StringIO | ||||
|     from email.mime.base import MIMEBase | ||||
|     from email.mime.multipart import MIMEMultipart | ||||
|     from email.mime.text import MIMEText | ||||
| 
 | ||||
| from email import encoders | ||||
| from email.utils import formatdate, make_msgid | ||||
| from email.generator import Generator | ||||
| 
 | ||||
| from cps.services.worker import CalibreTask | ||||
| from cps import logger, config | ||||
| 
 | ||||
| from cps import gdriveutils | ||||
| 
 | ||||
| log = logger.create() | ||||
| 
 | ||||
| CHUNKSIZE = 8192 | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending email with ability to get current progress | ||||
| class EmailBase: | ||||
| 
 | ||||
|     transferSize = 0 | ||||
|     progress = 0 | ||||
| 
 | ||||
|     def data(self, msg): | ||||
|         self.transferSize = len(msg) | ||||
|         (code, resp) = smtplib.SMTP.data(self, msg) | ||||
|         self.progress = 0 | ||||
|         return (code, resp) | ||||
| 
 | ||||
|     def send(self, strg): | ||||
|         """Send `strg' to the server.""" | ||||
|         log.debug('send: %r', strg[:300]) | ||||
|         if hasattr(self, 'sock') and self.sock: | ||||
|             try: | ||||
|                 if self.transferSize: | ||||
|                     lock=threading.Lock() | ||||
|                     lock.acquire() | ||||
|                     self.transferSize = len(strg) | ||||
|                     lock.release() | ||||
|                     for i in range(0, self.transferSize, CHUNKSIZE): | ||||
|                         if isinstance(strg, bytes): | ||||
|                             self.sock.send((strg[i:i + CHUNKSIZE])) | ||||
|                         else: | ||||
|                             self.sock.send((strg[i:i + CHUNKSIZE]).encode('utf-8')) | ||||
|                         lock.acquire() | ||||
|                         self.progress = i | ||||
|                         lock.release() | ||||
|                 else: | ||||
|                     self.sock.sendall(strg.encode('utf-8')) | ||||
|             except socket.error: | ||||
|                 self.close() | ||||
|                 raise smtplib.SMTPServerDisconnected('Server not connected') | ||||
|         else: | ||||
|             raise smtplib.SMTPServerDisconnected('please run connect() first') | ||||
| 
 | ||||
|     @classmethod | ||||
|     def _print_debug(cls, *args): | ||||
|         log.debug(args) | ||||
| 
 | ||||
|     def getTransferStatus(self): | ||||
|         if self.transferSize: | ||||
|             lock2 = threading.Lock() | ||||
|             lock2.acquire() | ||||
|             value = int((float(self.progress) / float(self.transferSize))*100) | ||||
|             lock2.release() | ||||
|             return value / 100 | ||||
|         else: | ||||
|             return 1 | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending email with ability to get current progress, derived from emailbase class | ||||
| class Email(EmailBase, smtplib.SMTP): | ||||
| 
 | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         smtplib.SMTP.__init__(self, *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class | ||||
| class EmailSSL(EmailBase, smtplib.SMTP_SSL): | ||||
| 
 | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         smtplib.SMTP_SSL.__init__(self, *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| class TaskEmail(CalibreTask): | ||||
|     def __init__(self, subject, filepath, attachment, settings, recipient, taskMessage, text, internal=False): | ||||
|         super(TaskEmail, self).__init__(taskMessage) | ||||
|         self.subject = subject | ||||
|         self.attachment = attachment | ||||
|         self.settings = settings | ||||
|         self.filepath = filepath | ||||
|         self.recipent = recipient | ||||
|         self.text = text | ||||
|         self.asyncSMTP = None | ||||
| 
 | ||||
|         self.results = dict() | ||||
| 
 | ||||
|     def run(self, worker_thread): | ||||
|         # create MIME message | ||||
|         msg = MIMEMultipart() | ||||
|         msg['Subject'] = self.subject | ||||
|         msg['Message-Id'] = make_msgid('calibre-web') | ||||
|         msg['Date'] = formatdate(localtime=True) | ||||
|         text = self.text | ||||
|         msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) | ||||
|         if self.attachment: | ||||
|             result = self._get_attachment(self.filepath, self.attachment) | ||||
|             if result: | ||||
|                 msg.attach(result) | ||||
|             else: | ||||
|                 self._handleError(u"Attachment not found") | ||||
|                 return | ||||
| 
 | ||||
|         msg['From'] = self.settings["mail_from"] | ||||
|         msg['To'] = self.recipent | ||||
| 
 | ||||
|         use_ssl = int(self.settings.get('mail_use_ssl', 0)) | ||||
|         try: | ||||
|             # convert MIME message to string | ||||
|             fp = StringIO() | ||||
|             gen = Generator(fp, mangle_from_=False) | ||||
|             gen.flatten(msg) | ||||
|             msg = fp.getvalue() | ||||
| 
 | ||||
|             # send email | ||||
|             timeout = 600  # set timeout to 5mins | ||||
| 
 | ||||
|             # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten | ||||
|             # _print_debug function | ||||
|             if sys.version_info < (3, 0): | ||||
|                 org_smtpstderr = smtplib.stderr | ||||
|                 smtplib.stderr = logger.StderrLogger('worker.smtp') | ||||
| 
 | ||||
|             if use_ssl == 2: | ||||
|                 self.asyncSMTP = EmailSSL(self.settings["mail_server"], self.settings["mail_port"], | ||||
|                                            timeout=timeout) | ||||
|             else: | ||||
|                 self.asyncSMTP = Email(self.settings["mail_server"], self.settings["mail_port"], timeout=timeout) | ||||
| 
 | ||||
|             # link to logginglevel | ||||
|             if logger.is_debug_enabled(): | ||||
|                 self.asyncSMTP.set_debuglevel(1) | ||||
|             if use_ssl == 1: | ||||
|                 self.asyncSMTP.starttls() | ||||
|             if self.settings["mail_password"]: | ||||
|                 self.asyncSMTP.login(str(self.settings["mail_login"]), str(self.settings["mail_password"])) | ||||
|             self.asyncSMTP.sendmail(self.settings["mail_from"], self.recipent, msg) | ||||
|             self.asyncSMTP.quit() | ||||
|             self._handleSuccess() | ||||
| 
 | ||||
|             if sys.version_info < (3, 0): | ||||
|                 smtplib.stderr = org_smtpstderr | ||||
| 
 | ||||
|         except (MemoryError) as e: | ||||
|             log.exception(e) | ||||
|             self._handleError(u'MemoryError sending email: ' + str(e)) | ||||
|             return None | ||||
|         except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: | ||||
|             if hasattr(e, "smtp_error"): | ||||
|                 text = e.smtp_error.decode('utf-8').replace("\n", '. ') | ||||
|             elif hasattr(e, "message"): | ||||
|                 text = e.message | ||||
|             else: | ||||
|                 log.exception(e) | ||||
|                 text = '' | ||||
|             self._handleError(u'Smtplib Error sending email: ' + text) | ||||
|             return None | ||||
|         except (socket.error) as e: | ||||
|             self._handleError(u'Socket Error sending email: ' + e.strerror) | ||||
|             return None | ||||
| 
 | ||||
|     @property | ||||
|     def progress(self): | ||||
|         if self.asyncSMTP is not None: | ||||
|             return self.asyncSMTP.getTransferStatus() | ||||
|         else: | ||||
|             return self._progress | ||||
| 
 | ||||
|     @progress.setter | ||||
|     def progress(self, x): | ||||
|         """This gets explicitly set when handle(Success|Error) are called. In this case, remove the SMTP connection""" | ||||
|         if x == 1: | ||||
|             self.asyncSMTP = None | ||||
|             self._progress = x | ||||
| 
 | ||||
| 
 | ||||
|     @classmethod | ||||
|     def _get_attachment(cls, bookpath, filename): | ||||
|         """Get file as MIMEBase message""" | ||||
|         calibrepath = config.config_calibre_dir | ||||
|         if config.config_use_google_drive: | ||||
|             df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) | ||||
|             if df: | ||||
|                 datafile = os.path.join(calibrepath, bookpath, filename) | ||||
|                 if not os.path.exists(os.path.join(calibrepath, bookpath)): | ||||
|                     os.makedirs(os.path.join(calibrepath, bookpath)) | ||||
|                 df.GetContentFile(datafile) | ||||
|             else: | ||||
|                 return None | ||||
|             file_ = open(datafile, 'rb') | ||||
|             data = file_.read() | ||||
|             file_.close() | ||||
|             os.remove(datafile) | ||||
|         else: | ||||
|             try: | ||||
|                 file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') | ||||
|                 data = file_.read() | ||||
|                 file_.close() | ||||
|             except IOError as e: | ||||
|                 log.exception(e) | ||||
|                 log.error(u'The requested file could not be read. Maybe wrong permissions?') | ||||
|                 return None | ||||
| 
 | ||||
|         attachment = MIMEBase('application', 'octet-stream') | ||||
|         attachment.set_payload(data) | ||||
|         encoders.encode_base64(attachment) | ||||
|         attachment.add_header('Content-Disposition', 'attachment', | ||||
|                               filename=filename) | ||||
|         return attachment | ||||
| 
 | ||||
|     @property | ||||
|     def name(self): | ||||
|         return "Email" | ||||
							
								
								
									
										19
									
								
								cps/tasks/upload.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								cps/tasks/upload.py
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,19 @@ | |||
| from __future__ import division, print_function, unicode_literals | ||||
| 
 | ||||
| from datetime import datetime | ||||
| from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS | ||||
| 
 | ||||
| class TaskUpload(CalibreTask): | ||||
|     def __init__(self, taskMessage): | ||||
|         super(TaskUpload, self).__init__(taskMessage) | ||||
|         self.start_time = self.end_time = datetime.now() | ||||
|         self.stat = STAT_FINISH_SUCCESS | ||||
|         self.progress = 1 | ||||
| 
 | ||||
|     def run(self, worker_thread): | ||||
|         """Upload task doesn't have anything to do, it's simply a way to add information to the task list""" | ||||
|         pass | ||||
| 
 | ||||
|     @property | ||||
|     def name(self): | ||||
|         return "Upload" | ||||
							
								
								
									
										21
									
								
								cps/web.py
									
									
									
									
									
								
							
							
						
						
									
										21
									
								
								cps/web.py
									
									
									
									
									
								
							|  | @ -33,7 +33,7 @@ import re | |||
| from babel.dates import format_date | ||||
| from babel import Locale as LC | ||||
| from babel.core import UnknownLocaleError | ||||
| from flask import Blueprint | ||||
| from flask import Blueprint, jsonify | ||||
| from flask import render_template, request, redirect, send_from_directory, make_response, g, flash, abort, url_for | ||||
| from flask_babel import gettext as _ | ||||
| from flask_login import login_user, logout_user, login_required, current_user, confirm_login | ||||
|  | @ -42,6 +42,9 @@ from sqlalchemy.sql.expression import text, func, true, false, not_, and_, or_ | |||
| from sqlalchemy.orm.attributes import flag_modified | ||||
| from werkzeug.exceptions import default_exceptions, InternalServerError | ||||
| from sqlalchemy.sql.functions import coalesce | ||||
| 
 | ||||
| from .services.worker import WorkerThread | ||||
| 
 | ||||
| try: | ||||
|     from werkzeug.exceptions import FailedDependency | ||||
| except ImportError: | ||||
|  | @ -49,11 +52,11 @@ except ImportError: | |||
| from werkzeug.datastructures import Headers | ||||
| from werkzeug.security import generate_password_hash, check_password_hash | ||||
| 
 | ||||
| from . import constants, logger, isoLanguages, services, worker, cli | ||||
| from . import constants, logger, isoLanguages, services | ||||
| from . import searched_ids, lm, babel, db, ub, config, get_locale, app | ||||
| from . import calibre_db | ||||
| from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download | ||||
| from .helper import check_valid_domain, render_task_status, json_serial, \ | ||||
| from .helper import check_valid_domain, render_task_status, \ | ||||
|     get_cc_columns, get_book_cover, get_download_link, send_mail, generate_random_password, \ | ||||
|     send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password | ||||
| from .pagination import Pagination | ||||
|  | @ -376,12 +379,8 @@ def import_ldap_users(): | |||
| @web.route("/ajax/emailstat") | ||||
| @login_required | ||||
| def get_email_status_json(): | ||||
|     tasks = worker.get_taskstatus() | ||||
|     answer = render_task_status(tasks) | ||||
|     js = json.dumps(answer, default=json_serial) | ||||
|     response = make_response(js) | ||||
|     response.headers["Content-Type"] = "application/json; charset=utf-8" | ||||
|     return response | ||||
|     tasks = WorkerThread.getInstance().tasks | ||||
|     return jsonify(render_task_status(tasks)) | ||||
| 
 | ||||
| 
 | ||||
| @web.route("/ajax/bookmark/<int:book_id>/<book_format>", methods=['POST']) | ||||
|  | @ -1173,7 +1172,7 @@ def category_list(): | |||
| @login_required | ||||
| def get_tasks_status(): | ||||
|     # if current user admin, show all email, otherwise only own emails | ||||
|     tasks = worker.get_taskstatus() | ||||
|     tasks = WorkerThread.getInstance().tasks | ||||
|     answer = render_task_status(tasks) | ||||
|     return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks") | ||||
| 
 | ||||
|  | @ -1686,6 +1685,7 @@ def profile(): | |||
|                                              title=_(u"%(name)s's profile", name=current_user.nickname), page="me", | ||||
|                                              kobo_support=kobo_support, | ||||
|                                              registered_oauth=local_oauth_check, oauth_status=oauth_status) | ||||
|             current_user.email = to_save["email"] | ||||
|         if "nickname" in to_save and to_save["nickname"] != current_user.nickname: | ||||
|             # Query User nickname, if not existing, change | ||||
|             if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar(): | ||||
|  | @ -1702,7 +1702,6 @@ def profile(): | |||
|                                              title=_(u"Edit User %(nick)s", | ||||
|                                                      nick=current_user.nickname), | ||||
|                                              page="edituser") | ||||
|             current_user.email = to_save["email"] | ||||
|         if "show_random" in to_save and to_save["show_random"] == "on": | ||||
|             current_user.random_books = 1 | ||||
|         if "default_language" in to_save: | ||||
|  |  | |||
							
								
								
									
										602
									
								
								cps/worker.py
									
									
									
									
									
								
							
							
						
						
									
										602
									
								
								cps/worker.py
									
									
									
									
									
								
							|  | @ -1,602 +0,0 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| 
 | ||||
| #  This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) | ||||
| #    Copyright (C) 2018-2019 OzzieIsaacs, bodybybuddha, janeczku | ||||
| # | ||||
| #  This program is free software: you can redistribute it and/or modify | ||||
| #  it under the terms of the GNU General Public License as published by | ||||
| #  the Free Software Foundation, either version 3 of the License, or | ||||
| #  (at your option) any later version. | ||||
| # | ||||
| #  This program is distributed in the hope that it will be useful, | ||||
| #  but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| #  GNU General Public License for more details. | ||||
| # | ||||
| #  You should have received a copy of the GNU General Public License | ||||
| #  along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| from __future__ import division, print_function, unicode_literals | ||||
| import sys | ||||
| import os | ||||
| import re | ||||
| import smtplib | ||||
| import socket | ||||
| import time | ||||
| import threading | ||||
| try: | ||||
|     import queue | ||||
| except ImportError: | ||||
|     import Queue as queue | ||||
| from glob import glob | ||||
| from shutil import copyfile | ||||
| from datetime import datetime | ||||
| 
 | ||||
| try: | ||||
|     from StringIO import StringIO | ||||
|     from email.MIMEBase import MIMEBase | ||||
|     from email.MIMEMultipart import MIMEMultipart | ||||
|     from email.MIMEText import MIMEText | ||||
| except ImportError: | ||||
|     from io import StringIO | ||||
|     from email.mime.base import MIMEBase | ||||
|     from email.mime.multipart import MIMEMultipart | ||||
|     from email.mime.text import MIMEText | ||||
| 
 | ||||
| from email import encoders | ||||
| from email.utils import formatdate | ||||
| from email.utils import make_msgid | ||||
| from email.generator import Generator | ||||
| from flask_babel import gettext as _ | ||||
| 
 | ||||
| from . import calibre_db, db | ||||
| from . import logger, config | ||||
| from .subproc_wrapper import process_open | ||||
| from . import gdriveutils | ||||
| 
 | ||||
| log = logger.create() | ||||
| 
 | ||||
| chunksize = 8192 | ||||
| # task 'status' consts | ||||
| STAT_WAITING = 0 | ||||
| STAT_FAIL = 1 | ||||
| STAT_STARTED = 2 | ||||
| STAT_FINISH_SUCCESS = 3 | ||||
| #taskType consts | ||||
| TASK_EMAIL = 1 | ||||
| TASK_CONVERT = 2 | ||||
| TASK_UPLOAD = 3 | ||||
| TASK_CONVERT_ANY = 4 | ||||
| 
 | ||||
| RET_FAIL = 0 | ||||
| RET_SUCCESS = 1 | ||||
| 
 | ||||
| 
 | ||||
| def _get_main_thread(): | ||||
|     for t in threading.enumerate(): | ||||
|         if t.__class__.__name__ == '_MainThread': | ||||
|             return t | ||||
|     raise Exception("main thread not found?!") | ||||
| 
 | ||||
| 
 | ||||
| # For gdrive download book from gdrive to calibredir (temp dir for books), read contents in both cases and append | ||||
| # it in MIME Base64 encoded to | ||||
| def get_attachment(bookpath, filename): | ||||
|     """Get file as MIMEBase message""" | ||||
|     calibrepath = config.config_calibre_dir | ||||
|     if config.config_use_google_drive: | ||||
|         df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) | ||||
|         if df: | ||||
|             datafile = os.path.join(calibrepath, bookpath, filename) | ||||
|             if not os.path.exists(os.path.join(calibrepath, bookpath)): | ||||
|                 os.makedirs(os.path.join(calibrepath, bookpath)) | ||||
|             df.GetContentFile(datafile) | ||||
|         else: | ||||
|             return None | ||||
|         file_ = open(datafile, 'rb') | ||||
|         data = file_.read() | ||||
|         file_.close() | ||||
|         os.remove(datafile) | ||||
|     else: | ||||
|         try: | ||||
|             file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') | ||||
|             data = file_.read() | ||||
|             file_.close() | ||||
|         except IOError as e: | ||||
|             log.exception(e) | ||||
|             log.error(u'The requested file could not be read. Maybe wrong permissions?') | ||||
|             return None | ||||
| 
 | ||||
|     attachment = MIMEBase('application', 'octet-stream') | ||||
|     attachment.set_payload(data) | ||||
|     encoders.encode_base64(attachment) | ||||
|     attachment.add_header('Content-Disposition', 'attachment', | ||||
|                           filename=filename) | ||||
|     return attachment | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending email with ability to get current progress | ||||
| class emailbase(): | ||||
| 
 | ||||
|     transferSize = 0 | ||||
|     progress = 0 | ||||
| 
 | ||||
|     def data(self, msg): | ||||
|         self.transferSize = len(msg) | ||||
|         (code, resp) = smtplib.SMTP.data(self, msg) | ||||
|         self.progress = 0 | ||||
|         return (code, resp) | ||||
| 
 | ||||
|     def send(self, strg): | ||||
|         """Send `strg' to the server.""" | ||||
|         log.debug('send: %r', strg[:300]) | ||||
|         if hasattr(self, 'sock') and self.sock: | ||||
|             try: | ||||
|                 if self.transferSize: | ||||
|                     lock=threading.Lock() | ||||
|                     lock.acquire() | ||||
|                     self.transferSize = len(strg) | ||||
|                     lock.release() | ||||
|                     for i in range(0, self.transferSize, chunksize): | ||||
|                         if isinstance(strg, bytes): | ||||
|                             self.sock.send((strg[i:i+chunksize])) | ||||
|                         else: | ||||
|                             self.sock.send((strg[i:i + chunksize]).encode('utf-8')) | ||||
|                         lock.acquire() | ||||
|                         self.progress = i | ||||
|                         lock.release() | ||||
|                 else: | ||||
|                     self.sock.sendall(strg.encode('utf-8')) | ||||
|             except socket.error: | ||||
|                 self.close() | ||||
|                 raise smtplib.SMTPServerDisconnected('Server not connected') | ||||
|         else: | ||||
|             raise smtplib.SMTPServerDisconnected('please run connect() first') | ||||
| 
 | ||||
|     @classmethod | ||||
|     def _print_debug(self, *args): | ||||
|         log.debug(args) | ||||
| 
 | ||||
|     def getTransferStatus(self): | ||||
|         if self.transferSize: | ||||
|             lock2 = threading.Lock() | ||||
|             lock2.acquire() | ||||
|             value = int((float(self.progress) / float(self.transferSize))*100) | ||||
|             lock2.release() | ||||
|             return str(value) + ' %' | ||||
|         else: | ||||
|             return "100 %" | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending email with ability to get current progress, derived from emailbase class | ||||
| class email(emailbase, smtplib.SMTP): | ||||
| 
 | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         smtplib.SMTP.__init__(self, *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| # Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class | ||||
| class email_SSL(emailbase, smtplib.SMTP_SSL): | ||||
| 
 | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         smtplib.SMTP_SSL.__init__(self, *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| #Class for all worker tasks in the background | ||||
| class WorkerThread(threading.Thread): | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         threading.Thread.__init__(self) | ||||
|         self.status = 0 | ||||
|         self.current = 0 | ||||
|         self.last = 0 | ||||
|         self.queue = list() | ||||
|         self.UIqueue = list() | ||||
|         self.asyncSMTP = None | ||||
|         self.id = 0 | ||||
|         self.db_queue = queue.Queue() | ||||
|         calibre_db.add_queue(self.db_queue) | ||||
|         self.doLock = threading.Lock() | ||||
| 
 | ||||
|     # Main thread loop starting the different tasks | ||||
|     def run(self): | ||||
|         main_thread = _get_main_thread() | ||||
|         while main_thread.is_alive(): | ||||
|             try: | ||||
|                 self.doLock.acquire() | ||||
|                 if self.current != self.last: | ||||
|                     index = self.current | ||||
|                     log.info(index) | ||||
|                     log.info(len(self.queue)) | ||||
|                     self.doLock.release() | ||||
|                     if self.queue[index]['taskType'] == TASK_EMAIL: | ||||
|                         self._send_raw_email() | ||||
|                     elif self.queue[index]['taskType'] in (TASK_CONVERT, TASK_CONVERT_ANY): | ||||
|                         self._convert_any_format() | ||||
|                     # TASK_UPLOAD is handled implicitly | ||||
|                     self.doLock.acquire() | ||||
|                     self.current += 1 | ||||
|                     if self.current > self.last: | ||||
|                         self.current = self.last | ||||
|                     self.doLock.release() | ||||
|                 else: | ||||
|                     self.doLock.release() | ||||
|             except Exception as e: | ||||
|                 log.exception(e) | ||||
|                 self.doLock.release() | ||||
|             if main_thread.is_alive(): | ||||
|                 time.sleep(1) | ||||
| 
 | ||||
|     def get_send_status(self): | ||||
|         if self.asyncSMTP: | ||||
|             return self.asyncSMTP.getTransferStatus() | ||||
|         else: | ||||
|             return "0 %" | ||||
| 
 | ||||
|     def _delete_completed_tasks(self): | ||||
|         for index, task in reversed(list(enumerate(self.UIqueue))): | ||||
|             if task['progress'] == "100 %": | ||||
|                 # delete tasks | ||||
|                 self.queue.pop(index) | ||||
|                 self.UIqueue.pop(index) | ||||
|                 # if we are deleting entries before the current index, adjust the index | ||||
|                 if index <= self.current and self.current: | ||||
|                     self.current -= 1 | ||||
|         self.last = len(self.queue) | ||||
| 
 | ||||
|     def get_taskstatus(self): | ||||
|         self.doLock.acquire() | ||||
|         if self.current  < len(self.queue): | ||||
|             if self.UIqueue[self.current]['stat'] == STAT_STARTED: | ||||
|                 if self.queue[self.current]['taskType'] == TASK_EMAIL: | ||||
|                     self.UIqueue[self.current]['progress'] = self.get_send_status() | ||||
|                 self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] | ||||
|                 self.UIqueue[self.current]['rt'] = self.UIqueue[self.current]['formRuntime'].days*24*60 \ | ||||
|                                                    + self.UIqueue[self.current]['formRuntime'].seconds \ | ||||
|                                                    + self.UIqueue[self.current]['formRuntime'].microseconds | ||||
|         self.doLock.release() | ||||
|         return self.UIqueue | ||||
| 
 | ||||
|     def _convert_any_format(self): | ||||
|         # convert book, and upload in case of google drive | ||||
|         self.doLock.acquire() | ||||
|         index = self.current | ||||
|         self.doLock.release() | ||||
|         self.UIqueue[index]['stat'] = STAT_STARTED | ||||
|         self.queue[index]['starttime'] = datetime.now() | ||||
|         self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] | ||||
|         curr_task = self.queue[index]['taskType'] | ||||
|         filename = self._convert_ebook_format() | ||||
|         if filename: | ||||
|             if config.config_use_google_drive: | ||||
|                 gdriveutils.updateGdriveCalibreFromLocal() | ||||
|             if curr_task == TASK_CONVERT: | ||||
|                 self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], | ||||
|                                 filename, self.queue[index]['settings'], self.queue[index]['kindle'], | ||||
|                                 self.UIqueue[index]['user'], self.queue[index]['title'], | ||||
|                                 self.queue[index]['settings']['body'], internal=True) | ||||
| 
 | ||||
| 
 | ||||
|     def _convert_ebook_format(self): | ||||
|         error_message = None | ||||
|         self.doLock.acquire() | ||||
|         index = self.current | ||||
|         self.doLock.release() | ||||
|         file_path = self.queue[index]['file_path'] | ||||
|         book_id = self.queue[index]['bookid'] | ||||
|         format_old_ext = u'.' + self.queue[index]['settings']['old_book_format'].lower() | ||||
|         format_new_ext = u'.' + self.queue[index]['settings']['new_book_format'].lower() | ||||
| 
 | ||||
|         # check to see if destination format already exists - | ||||
|         # if it does - mark the conversion task as complete and return a success | ||||
|         # this will allow send to kindle workflow to continue to work | ||||
|         if os.path.isfile(file_path + format_new_ext): | ||||
|             log.info("Book id %d already converted to %s", book_id, format_new_ext) | ||||
|             cur_book = calibre_db.get_book(book_id) | ||||
|             self.queue[index]['path'] = file_path | ||||
|             self.queue[index]['title'] = cur_book.title | ||||
|             self._handleSuccess() | ||||
|             return file_path + format_new_ext | ||||
|         else: | ||||
|             log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", | ||||
|                      book_id, | ||||
|                      format_new_ext) | ||||
| 
 | ||||
|         if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': | ||||
|             check, error_message = self._convert_kepubify(file_path, | ||||
|                                                           format_old_ext, | ||||
|                                                           format_new_ext, | ||||
|                                                           index) | ||||
|         else: | ||||
|             # check if calibre converter-executable is existing | ||||
|             if not os.path.exists(config.config_converterpath): | ||||
|                 # ToDo Text is not translated | ||||
|                 self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) | ||||
|                 return | ||||
|             check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext, index) | ||||
| 
 | ||||
|         if check == 0: | ||||
|             cur_book = calibre_db.get_book(book_id) | ||||
|             if os.path.isfile(file_path + format_new_ext): | ||||
|                 # self.db_queue.join() | ||||
|                 new_format = db.Data(name=cur_book.data[0].name, | ||||
|                                          book_format=self.queue[index]['settings']['new_book_format'].upper(), | ||||
|                                          book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) | ||||
|                 task = {'task':'add_format','id': book_id, 'format': new_format} | ||||
|                 self.db_queue.put(task) | ||||
|                 # To Do how to handle error? | ||||
| 
 | ||||
|                 '''cur_book.data.append(new_format) | ||||
|                 try: | ||||
|                     # db.session.merge(cur_book) | ||||
|                     calibre_db.session.commit() | ||||
|                 except OperationalError as e: | ||||
|                     calibre_db.session.rollback() | ||||
|                     log.error("Database error: %s", e) | ||||
|                     self._handleError(_(u"Database error: %(error)s.", error=e)) | ||||
|                     return''' | ||||
| 
 | ||||
|                 self.queue[index]['path'] = cur_book.path | ||||
|                 self.queue[index]['title'] = cur_book.title | ||||
|                 if config.config_use_google_drive: | ||||
|                     os.remove(file_path + format_old_ext) | ||||
|                 self._handleSuccess() | ||||
|                 return file_path + format_new_ext | ||||
|             else: | ||||
|                 error_message = format_new_ext.upper() + ' format not found on disk' | ||||
|         log.info("ebook converter failed with error while converting book") | ||||
|         if not error_message: | ||||
|             error_message = 'Ebook converter failed with unknown error' | ||||
|         self._handleError(error_message) | ||||
|         return | ||||
| 
 | ||||
| 
 | ||||
|     def _convert_calibre(self, file_path, format_old_ext, format_new_ext, index): | ||||
|         try: | ||||
|             # Linux py2.7 encode as list without quotes no empty element for parameters | ||||
|             # linux py3.x no encode and as list without quotes no empty element for parameters | ||||
|             # windows py2.7 encode as string with quotes empty element for parameters is okay | ||||
|             # windows py 3.x no encode and as string with quotes empty element for parameters is okay | ||||
|             # separate handling for windows and linux | ||||
|             quotes = [1, 2] | ||||
|             command = [config.config_converterpath, (file_path + format_old_ext), | ||||
|                        (file_path + format_new_ext)] | ||||
|             quotes_index = 3 | ||||
|             if config.config_calibre: | ||||
|                 parameters = config.config_calibre.split(" ") | ||||
|                 for param in parameters: | ||||
|                     command.append(param) | ||||
|                     quotes.append(quotes_index) | ||||
|                     quotes_index += 1 | ||||
| 
 | ||||
|             p = process_open(command, quotes) | ||||
|         except OSError as e: | ||||
|             return 1, _(u"Ebook-converter failed: %(error)s", error=e) | ||||
| 
 | ||||
|         while p.poll() is None: | ||||
|             nextline = p.stdout.readline() | ||||
|             if os.name == 'nt' and sys.version_info < (3, 0): | ||||
|                 nextline = nextline.decode('windows-1252') | ||||
|             elif os.name == 'posix' and sys.version_info < (3, 0): | ||||
|                 nextline = nextline.decode('utf-8') | ||||
|             log.debug(nextline.strip('\r\n')) | ||||
|             # parse progress string from calibre-converter | ||||
|             progress = re.search(r"(\d+)%\s.*", nextline) | ||||
|             if progress: | ||||
|                 self.UIqueue[index]['progress'] = progress.group(1) + ' %' | ||||
| 
 | ||||
|         # process returncode | ||||
|         check = p.returncode | ||||
|         calibre_traceback = p.stderr.readlines() | ||||
|         error_message = "" | ||||
|         for ele in calibre_traceback: | ||||
|             if sys.version_info < (3, 0): | ||||
|                 ele = ele.decode('utf-8') | ||||
|             log.debug(ele.strip('\n')) | ||||
|             if not ele.startswith('Traceback') and not ele.startswith('  File'): | ||||
|                 error_message = "Calibre failed with error: %s" % ele.strip('\n') | ||||
|         return check, error_message | ||||
| 
 | ||||
| 
 | ||||
|     def _convert_kepubify(self, file_path, format_old_ext, format_new_ext, index): | ||||
|         quotes = [1, 3] | ||||
|         command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] | ||||
|         try: | ||||
|             p = process_open(command, quotes) | ||||
|         except OSError as e: | ||||
|             return 1, _(u"Kepubify-converter failed: %(error)s", error=e) | ||||
|         self.UIqueue[index]['progress'] = '1 %' | ||||
|         while True: | ||||
|             nextline = p.stdout.readlines() | ||||
|             nextline = [x.strip('\n') for x in nextline if x != '\n'] | ||||
|             if sys.version_info < (3, 0): | ||||
|                 nextline = [x.decode('utf-8') for x in nextline] | ||||
|             for line in nextline: | ||||
|                 log.debug(line) | ||||
|             if p.poll() is not None: | ||||
|                 break | ||||
| 
 | ||||
|         # ToD Handle | ||||
|         # process returncode | ||||
|         check = p.returncode | ||||
| 
 | ||||
|         # move file | ||||
|         if check == 0: | ||||
|             converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) | ||||
|             if len(converted_file) == 1: | ||||
|                 copyfile(converted_file[0], (file_path + format_new_ext)) | ||||
|                 os.unlink(converted_file[0]) | ||||
|             else: | ||||
|                 return 1, _(u"Converted file not found or more than one file in folder %(folder)s", | ||||
|                             folder=os.path.dirname(file_path)) | ||||
|         return check, None | ||||
| 
 | ||||
| 
 | ||||
|     def add_convert(self, file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): | ||||
|         self.doLock.acquire() | ||||
|         if self.last >= 20: | ||||
|             self._delete_completed_tasks() | ||||
|         # progress, runtime, and status = 0 | ||||
|         self.id += 1 | ||||
|         task = TASK_CONVERT_ANY | ||||
|         if kindle_mail: | ||||
|             task = TASK_CONVERT | ||||
|         self.queue.append({'file_path':file_path, 'bookid':bookid, 'starttime': 0, 'kindle': kindle_mail, | ||||
|                            'taskType': task, 'settings':settings}) | ||||
|         self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, | ||||
|                              'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': task } ) | ||||
| 
 | ||||
|         self.last=len(self.queue) | ||||
|         self.doLock.release() | ||||
| 
 | ||||
|     def add_email(self, subject, filepath, attachment, settings, recipient, user_name, taskMessage, | ||||
|                   text, internal=False): | ||||
|         # if more than 20 entries in the list, clean the list | ||||
|         self.doLock.acquire() | ||||
|         if self.last >= 20: | ||||
|             self._delete_completed_tasks() | ||||
|             if internal: | ||||
|                 self.current-= 1 | ||||
|         # progress, runtime, and status = 0 | ||||
|         self.id += 1 | ||||
|         self.queue.append({'subject':subject, 'attachment':attachment, 'filepath':filepath, | ||||
|                            'settings':settings, 'recipent':recipient, 'starttime': 0, | ||||
|                            'taskType': TASK_EMAIL, 'text':text}) | ||||
|         self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, | ||||
|                              'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': TASK_EMAIL }) | ||||
|         self.last=len(self.queue) | ||||
|         self.doLock.release() | ||||
| 
 | ||||
|     def add_upload(self, user_name, taskMessage): | ||||
|         # if more than 20 entries in the list, clean the list | ||||
|         self.doLock.acquire() | ||||
| 
 | ||||
| 
 | ||||
|         if self.last >= 20: | ||||
|             self._delete_completed_tasks() | ||||
|         # progress=100%, runtime=0, and status finished | ||||
|         self.id += 1 | ||||
|         starttime = datetime.now() | ||||
|         self.queue.append({'starttime': starttime, 'taskType': TASK_UPLOAD}) | ||||
|         self.UIqueue.append({'user': user_name, 'formStarttime': starttime, 'progress': "100 %", 'taskMess': taskMessage, | ||||
|                              'runtime': '0 s', 'stat': STAT_FINISH_SUCCESS,'id': self.id, 'taskType': TASK_UPLOAD}) | ||||
|         self.last=len(self.queue) | ||||
|         self.doLock.release() | ||||
| 
 | ||||
|     def _send_raw_email(self): | ||||
|         self.doLock.acquire() | ||||
|         index = self.current | ||||
|         self.doLock.release() | ||||
|         self.queue[index]['starttime'] = datetime.now() | ||||
|         self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] | ||||
|         self.UIqueue[index]['stat'] = STAT_STARTED | ||||
|         obj=self.queue[index] | ||||
|         # create MIME message | ||||
|         msg = MIMEMultipart() | ||||
|         msg['Subject'] = self.queue[index]['subject'] | ||||
|         msg['Message-Id'] = make_msgid('calibre-web') | ||||
|         msg['Date'] = formatdate(localtime=True) | ||||
|         text = self.queue[index]['text'] | ||||
|         msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) | ||||
|         if obj['attachment']: | ||||
|             result = get_attachment(obj['filepath'], obj['attachment']) | ||||
|             if result: | ||||
|                 msg.attach(result) | ||||
|             else: | ||||
|                 self._handleError(u"Attachment not found") | ||||
|                 return | ||||
| 
 | ||||
|         msg['From'] = obj['settings']["mail_from"] | ||||
|         msg['To'] = obj['recipent'] | ||||
| 
 | ||||
|         use_ssl = int(obj['settings'].get('mail_use_ssl', 0)) | ||||
|         try: | ||||
|             # convert MIME message to string | ||||
|             fp = StringIO() | ||||
|             gen = Generator(fp, mangle_from_=False) | ||||
|             gen.flatten(msg) | ||||
|             msg = fp.getvalue() | ||||
| 
 | ||||
|             # send email | ||||
|             timeout = 600  # set timeout to 5mins | ||||
| 
 | ||||
|             # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten | ||||
|             # _print_debug function | ||||
|             if sys.version_info < (3, 0): | ||||
|                 org_smtpstderr = smtplib.stderr | ||||
|                 smtplib.stderr = logger.StderrLogger('worker.smtp') | ||||
| 
 | ||||
|             if use_ssl == 2: | ||||
|                 self.asyncSMTP = email_SSL(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) | ||||
|             else: | ||||
|                 self.asyncSMTP = email(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) | ||||
| 
 | ||||
|             # link to logginglevel | ||||
|             if logger.is_debug_enabled(): | ||||
|                 self.asyncSMTP.set_debuglevel(1) | ||||
|             if use_ssl == 1: | ||||
|                 self.asyncSMTP.starttls() | ||||
|             if obj['settings']["mail_password"]: | ||||
|                 self.asyncSMTP.login(str(obj['settings']["mail_login"]), str(obj['settings']["mail_password"])) | ||||
|             self.asyncSMTP.sendmail(obj['settings']["mail_from"], obj['recipent'], msg) | ||||
|             self.asyncSMTP.quit() | ||||
|             self._handleSuccess() | ||||
| 
 | ||||
|             if sys.version_info < (3, 0): | ||||
|                 smtplib.stderr = org_smtpstderr | ||||
| 
 | ||||
|         except (MemoryError) as e: | ||||
|             log.exception(e) | ||||
|             self._handleError(u'MemoryError sending email: ' + str(e)) | ||||
|             return None | ||||
|         except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: | ||||
|             if hasattr(e, "smtp_error"): | ||||
|                 text = e.smtp_error.decode('utf-8').replace("\n",'. ') | ||||
|             elif hasattr(e, "message"): | ||||
|                 text = e.message | ||||
|             else: | ||||
|                 log.exception(e) | ||||
|                 text = '' | ||||
|             self._handleError(u'Smtplib Error sending email: ' + text) | ||||
|             return None | ||||
|         except (socket.error) as e: | ||||
|             self._handleError(u'Socket Error sending email: ' + e.strerror) | ||||
|             return None | ||||
| 
 | ||||
|     def _handleError(self, error_message): | ||||
|         log.error(error_message) | ||||
|         self.doLock.acquire() | ||||
|         index = self.current | ||||
|         self.doLock.release() | ||||
|         self.UIqueue[index]['stat'] = STAT_FAIL | ||||
|         self.UIqueue[index]['progress'] = "100 %" | ||||
|         self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] | ||||
|         self.UIqueue[index]['message'] = error_message | ||||
| 
 | ||||
|     def _handleSuccess(self): | ||||
|         self.doLock.acquire() | ||||
|         index = self.current | ||||
|         self.doLock.release() | ||||
|         self.UIqueue[index]['stat'] = STAT_FINISH_SUCCESS | ||||
|         self.UIqueue[index]['progress'] = "100 %" | ||||
|         self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] | ||||
| 
 | ||||
| 
 | ||||
| def get_taskstatus(): | ||||
|     return _worker.get_taskstatus() | ||||
| 
 | ||||
| 
 | ||||
| def add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text): | ||||
|     return _worker.add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text) | ||||
| 
 | ||||
| 
 | ||||
| def add_upload(user_name, taskMessage): | ||||
|     return _worker.add_upload(user_name, taskMessage) | ||||
| 
 | ||||
| 
 | ||||
| def add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): | ||||
|     return _worker.add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail) | ||||
| 
 | ||||
| 
 | ||||
| _worker = WorkerThread() | ||||
| _worker.start() | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user