cleaner worker api
the worker thread now stops on its own
This commit is contained in:
parent
a836df9a5a
commit
63634961d4
|
@ -34,8 +34,9 @@ from flask_login import LoginManager
|
||||||
from flask_babel import Babel
|
from flask_babel import Babel
|
||||||
from flask_principal import Principal
|
from flask_principal import Principal
|
||||||
|
|
||||||
from . import logger, cache_buster, cli, config_sql, ub
|
from . import logger, cache_buster, cli, config_sql, ub, db, services
|
||||||
from .reverseproxy import ReverseProxied
|
from .reverseproxy import ReverseProxied
|
||||||
|
from .server import WebServer
|
||||||
|
|
||||||
|
|
||||||
mimetypes.init()
|
mimetypes.init()
|
||||||
|
@ -66,14 +67,8 @@ lm.anonymous_user = ub.Anonymous
|
||||||
ub.init_db(cli.settingspath)
|
ub.init_db(cli.settingspath)
|
||||||
# pylint: disable=no-member
|
# pylint: disable=no-member
|
||||||
config = config_sql.load_configuration(ub.session)
|
config = config_sql.load_configuration(ub.session)
|
||||||
from . import db, services
|
|
||||||
|
|
||||||
searched_ids = {}
|
searched_ids = {}
|
||||||
|
|
||||||
from .worker import WorkerThread
|
|
||||||
global_WorkerThread = WorkerThread()
|
|
||||||
|
|
||||||
from .server import WebServer
|
|
||||||
web_server = WebServer()
|
web_server = WebServer()
|
||||||
|
|
||||||
babel = Babel()
|
babel = Babel()
|
||||||
|
@ -109,7 +104,6 @@ def create_app():
|
||||||
if services.goodreads:
|
if services.goodreads:
|
||||||
services.goodreads.connect(config.config_goodreads_api_key, config.config_goodreads_api_secret, config.config_use_goodreads)
|
services.goodreads.connect(config.config_goodreads_api_key, config.config_goodreads_api_secret, config.config_use_goodreads)
|
||||||
|
|
||||||
global_WorkerThread.start()
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
@babel.localeselector
|
@babel.localeselector
|
||||||
|
|
|
@ -30,12 +30,12 @@ from uuid import uuid4
|
||||||
|
|
||||||
from flask import Blueprint, request, flash, redirect, url_for, abort, Markup, Response
|
from flask import Blueprint, request, flash, redirect, url_for, abort, Markup, Response
|
||||||
from flask_babel import gettext as _
|
from flask_babel import gettext as _
|
||||||
from flask_login import current_user
|
from flask_login import current_user, login_required
|
||||||
|
|
||||||
from . import constants, logger, isoLanguages, gdriveutils, uploader, helper
|
from . import constants, logger, isoLanguages, gdriveutils, uploader, helper
|
||||||
from . import config, get_locale, db, ub, global_WorkerThread
|
from . import config, get_locale, db, ub, worker
|
||||||
from .helper import order_authors, common_filters
|
from .helper import order_authors, common_filters
|
||||||
from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required, login_required
|
from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required
|
||||||
|
|
||||||
|
|
||||||
editbook = Blueprint('editbook', __name__)
|
editbook = Blueprint('editbook', __name__)
|
||||||
|
@ -358,7 +358,7 @@ def upload_single_file(request, book, book_id):
|
||||||
|
|
||||||
# Queue uploader info
|
# Queue uploader info
|
||||||
uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title)
|
uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title)
|
||||||
global_WorkerThread.add_upload(current_user.nickname,
|
worker.add_upload(current_user.nickname,
|
||||||
"<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>")
|
"<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>")
|
||||||
|
|
||||||
|
|
||||||
|
@ -667,7 +667,7 @@ def upload():
|
||||||
if error:
|
if error:
|
||||||
flash(error, category="error")
|
flash(error, category="error")
|
||||||
uploadText=_(u"File %(file)s uploaded", file=book.title)
|
uploadText=_(u"File %(file)s uploaded", file=book.title)
|
||||||
global_WorkerThread.add_upload(current_user.nickname,
|
worker.add_upload(current_user.nickname,
|
||||||
"<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>")
|
"<a href=\"" + url_for('web.show_book', book_id=book.id) + "\">" + uploadText + "</a>")
|
||||||
|
|
||||||
# create data for displaying display Full language name instead of iso639.part3language
|
# create data for displaying display Full language name instead of iso639.part3language
|
||||||
|
|
|
@ -60,7 +60,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
use_PIL = False
|
use_PIL = False
|
||||||
|
|
||||||
from . import logger, config, global_WorkerThread, get_locale, db, ub, isoLanguages
|
from . import logger, config, get_locale, db, ub, isoLanguages, worker
|
||||||
from . import gdriveutils as gd
|
from . import gdriveutils as gd
|
||||||
from .constants import STATIC_DIR as _STATIC_DIR
|
from .constants import STATIC_DIR as _STATIC_DIR
|
||||||
from .pagination import Pagination
|
from .pagination import Pagination
|
||||||
|
@ -112,7 +112,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format,
|
||||||
text = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title))
|
text = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title))
|
||||||
settings['old_book_format'] = old_book_format
|
settings['old_book_format'] = old_book_format
|
||||||
settings['new_book_format'] = new_book_format
|
settings['new_book_format'] = new_book_format
|
||||||
global_WorkerThread.add_convert(file_path, book.id, user_id, text, settings, kindle_mail)
|
worker.add_convert(file_path, book.id, user_id, text, settings, kindle_mail)
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
error_message = _(u"%(format)s not found: %(fn)s",
|
error_message = _(u"%(format)s not found: %(fn)s",
|
||||||
|
@ -121,9 +121,9 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format,
|
||||||
|
|
||||||
|
|
||||||
def send_test_mail(kindle_mail, user_name):
|
def send_test_mail(kindle_mail, user_name):
|
||||||
global_WorkerThread.add_email(_(u'Calibre-Web test e-mail'),None, None, config.get_mail_settings(),
|
worker.add_email(_(u'Calibre-Web test e-mail'), None, None,
|
||||||
kindle_mail, user_name, _(u"Test e-mail"),
|
config.get_mail_settings(), kindle_mail, user_name,
|
||||||
_(u'This e-mail has been sent via Calibre-Web.'))
|
_(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.'))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@ -138,8 +138,9 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False):
|
||||||
text += "Don't forget to change your password after first login.\r\n"
|
text += "Don't forget to change your password after first login.\r\n"
|
||||||
text += "Sincerely\r\n\r\n"
|
text += "Sincerely\r\n\r\n"
|
||||||
text += "Your Calibre-Web team"
|
text += "Your Calibre-Web team"
|
||||||
global_WorkerThread.add_email(_(u'Get Started with Calibre-Web'),None, None, config.get_mail_settings(),
|
worker.add_email(_(u'Get Started with Calibre-Web'), None, None,
|
||||||
e_mail, None, _(u"Registration e-mail for user: %(name)s", name=user_name), text)
|
config.get_mail_settings(), e_mail, None,
|
||||||
|
_(u"Registration e-mail for user: %(name)s", name=user_name), text)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@ -207,13 +208,13 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id):
|
||||||
if convert:
|
if convert:
|
||||||
# returns None if success, otherwise errormessage
|
# returns None if success, otherwise errormessage
|
||||||
return convert_book_format(book_id, calibrepath, u'epub', book_format.lower(), user_id, kindle_mail)
|
return convert_book_format(book_id, calibrepath, u'epub', book_format.lower(), user_id, kindle_mail)
|
||||||
else:
|
|
||||||
for entry in iter(book.data):
|
for entry in iter(book.data):
|
||||||
if entry.format.upper() == book_format.upper():
|
if entry.format.upper() == book_format.upper():
|
||||||
result = entry.name + '.' + book_format.lower()
|
converted_file_name = entry.name + '.' + book_format.lower()
|
||||||
global_WorkerThread.add_email(_(u"Send to Kindle"), book.path, result, config.get_mail_settings(),
|
worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name,
|
||||||
kindle_mail, user_id, _(u"E-mail: %(book)s", book=book.title),
|
config.get_mail_settings(), kindle_mail, user_id,
|
||||||
_(u'This e-mail has been sent via Calibre-Web.'))
|
_(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))
|
||||||
return
|
return
|
||||||
return _(u"The requested file could not be read. Maybe wrong permissions?")
|
return _(u"The requested file could not be read. Maybe wrong permissions?")
|
||||||
|
|
||||||
|
@ -232,7 +233,7 @@ def get_valid_filename(value, replace_whitespace=True):
|
||||||
value = value.replace(u'§', u'SS')
|
value = value.replace(u'§', u'SS')
|
||||||
value = value.replace(u'ß', u'ss')
|
value = value.replace(u'ß', u'ss')
|
||||||
value = unicodedata.normalize('NFKD', value)
|
value = unicodedata.normalize('NFKD', value)
|
||||||
re_slugify = re.compile('[\W\s-]', re.UNICODE)
|
re_slugify = re.compile(r'[\W\s-]', re.UNICODE)
|
||||||
if isinstance(value, str): # Python3 str, Python2 unicode
|
if isinstance(value, str): # Python3 str, Python2 unicode
|
||||||
value = re_slugify.sub('', value).strip()
|
value = re_slugify.sub('', value).strip()
|
||||||
else:
|
else:
|
||||||
|
@ -254,7 +255,7 @@ def get_valid_filename(value, replace_whitespace=True):
|
||||||
def get_sorted_author(value):
|
def get_sorted_author(value):
|
||||||
try:
|
try:
|
||||||
if ',' not in value:
|
if ',' not in value:
|
||||||
regexes = ["^(JR|SR)\.?$", "^I{1,3}\.?$", "^IV\.?$"]
|
regexes = [r"^(JR|SR)\.?$", r"^I{1,3}\.?$", r"^IV\.?$"]
|
||||||
combined = "(" + ")|(".join(regexes) + ")"
|
combined = "(" + ")|(".join(regexes) + ")"
|
||||||
value = value.split(" ")
|
value = value.split(" ")
|
||||||
if re.match(combined, value[-1].upper()):
|
if re.match(combined, value[-1].upper()):
|
||||||
|
|
|
@ -38,7 +38,7 @@ except ImportError:
|
||||||
VERSION = {'Tornado': 'v' + _version}
|
VERSION = {'Tornado': 'v' + _version}
|
||||||
_GEVENT = False
|
_GEVENT = False
|
||||||
|
|
||||||
from . import logger, global_WorkerThread
|
from . import logger
|
||||||
|
|
||||||
|
|
||||||
log = logger.create()
|
log = logger.create()
|
||||||
|
@ -179,7 +179,6 @@ class WebServer:
|
||||||
return False
|
return False
|
||||||
finally:
|
finally:
|
||||||
self.wsgiserver = None
|
self.wsgiserver = None
|
||||||
global_WorkerThread.stop()
|
|
||||||
|
|
||||||
if not self.restart:
|
if not self.restart:
|
||||||
log.info("Performing shutdown of Calibre-Web")
|
log.info("Performing shutdown of Calibre-Web")
|
||||||
|
|
|
@ -41,8 +41,8 @@ from werkzeug.exceptions import default_exceptions
|
||||||
from werkzeug.datastructures import Headers
|
from werkzeug.datastructures import Headers
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
|
||||||
from . import constants, logger, isoLanguages, services
|
from . import constants, logger, isoLanguages, services, worker
|
||||||
from . import global_WorkerThread, searched_ids, lm, babel, db, ub, config, negociate_locale, get_locale, app
|
from . import searched_ids, lm, babel, db, ub, config, negociate_locale, get_locale, app
|
||||||
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
|
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
|
||||||
from .helper import common_filters, get_search_results, fill_indexpage, speaking_language, check_valid_domain, \
|
from .helper import common_filters, get_search_results, fill_indexpage, speaking_language, check_valid_domain, \
|
||||||
order_authors, get_typeahead, render_task_status, json_serial, get_cc_columns, \
|
order_authors, get_typeahead, render_task_status, json_serial, get_cc_columns, \
|
||||||
|
@ -245,7 +245,7 @@ def before_request():
|
||||||
@web.route("/ajax/emailstat")
|
@web.route("/ajax/emailstat")
|
||||||
@login_required
|
@login_required
|
||||||
def get_email_status_json():
|
def get_email_status_json():
|
||||||
tasks = global_WorkerThread.get_taskstatus()
|
tasks = worker.get_taskstatus()
|
||||||
answer = render_task_status(tasks)
|
answer = render_task_status(tasks)
|
||||||
js = json.dumps(answer, default=json_serial)
|
js = json.dumps(answer, default=json_serial)
|
||||||
response = make_response(js)
|
response = make_response(js)
|
||||||
|
@ -760,7 +760,7 @@ def category_list():
|
||||||
@login_required
|
@login_required
|
||||||
def get_tasks_status():
|
def get_tasks_status():
|
||||||
# if current user admin, show all email, otherwise only own emails
|
# if current user admin, show all email, otherwise only own emails
|
||||||
tasks = global_WorkerThread.get_taskstatus()
|
tasks = worker.get_taskstatus()
|
||||||
answer = render_task_status(tasks)
|
answer = render_task_status(tasks)
|
||||||
return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks")
|
return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks")
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ import smtplib
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
|
@ -66,6 +66,13 @@ RET_FAIL = 0
|
||||||
RET_SUCCESS = 1
|
RET_SUCCESS = 1
|
||||||
|
|
||||||
|
|
||||||
|
def _get_main_thread():
|
||||||
|
for t in threading.enumerate():
|
||||||
|
if t.__class__.__name__ == '_MainThread':
|
||||||
|
return t
|
||||||
|
raise Exception("main thread not found?!")
|
||||||
|
|
||||||
|
|
||||||
# For gdrive download book from gdrive to calibredir (temp dir for books), read contents in both cases and append
|
# For gdrive download book from gdrive to calibredir (temp dir for books), read contents in both cases and append
|
||||||
# it in MIME Base64 encoded to
|
# it in MIME Base64 encoded to
|
||||||
def get_attachment(bookpath, filename):
|
def get_attachment(bookpath, filename):
|
||||||
|
@ -173,7 +180,6 @@ class email_SSL(emailbase, smtplib.SMTP_SSL):
|
||||||
class WorkerThread(threading.Thread):
|
class WorkerThread(threading.Thread):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._stopevent = threading.Event()
|
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.status = 0
|
self.status = 0
|
||||||
self.current = 0
|
self.current = 0
|
||||||
|
@ -185,7 +191,8 @@ class WorkerThread(threading.Thread):
|
||||||
|
|
||||||
# Main thread loop starting the different tasks
|
# Main thread loop starting the different tasks
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stopevent.isSet():
|
main_thread = _get_main_thread()
|
||||||
|
while main_thread.is_alive():
|
||||||
doLock = threading.Lock()
|
doLock = threading.Lock()
|
||||||
doLock.acquire()
|
doLock.acquire()
|
||||||
if self.current != self.last:
|
if self.current != self.last:
|
||||||
|
@ -200,11 +207,9 @@ class WorkerThread(threading.Thread):
|
||||||
self.current += 1
|
self.current += 1
|
||||||
else:
|
else:
|
||||||
doLock.release()
|
doLock.release()
|
||||||
|
if main_thread.is_alive():
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._stopevent.set()
|
|
||||||
|
|
||||||
def get_send_status(self):
|
def get_send_status(self):
|
||||||
if self.asyncSMTP:
|
if self.asyncSMTP:
|
||||||
return self.asyncSMTP.getTransferStatus()
|
return self.asyncSMTP.getTransferStatus()
|
||||||
|
@ -317,7 +322,7 @@ class WorkerThread(threading.Thread):
|
||||||
nextline = p.communicate()[0]
|
nextline = p.communicate()[0]
|
||||||
# Format of error message (kindlegen translates its output texts):
|
# Format of error message (kindlegen translates its output texts):
|
||||||
# Error(prcgen):E23006: Language not recognized in metadata.The dc:Language field is mandatory.Aborting.
|
# Error(prcgen):E23006: Language not recognized in metadata.The dc:Language field is mandatory.Aborting.
|
||||||
conv_error = re.search(".*\(.*\):(E\d+):\s(.*)", nextline, re.MULTILINE)
|
conv_error = re.search(r".*\(.*\):(E\d+):\s(.*)", nextline, re.MULTILINE)
|
||||||
# If error occoures, store error message for logfile
|
# If error occoures, store error message for logfile
|
||||||
if conv_error:
|
if conv_error:
|
||||||
error_message = _(u"Kindlegen failed with Error %(error)s. Message: %(message)s",
|
error_message = _(u"Kindlegen failed with Error %(error)s. Message: %(message)s",
|
||||||
|
@ -332,7 +337,7 @@ class WorkerThread(threading.Thread):
|
||||||
nextline = nextline.decode('utf-8')
|
nextline = nextline.decode('utf-8')
|
||||||
log.debug(nextline.strip('\r\n'))
|
log.debug(nextline.strip('\r\n'))
|
||||||
# parse progress string from calibre-converter
|
# parse progress string from calibre-converter
|
||||||
progress = re.search("(\d+)%\s.*", nextline)
|
progress = re.search(r"(\d+)%\s.*", nextline)
|
||||||
if progress:
|
if progress:
|
||||||
self.UIqueue[self.current]['progress'] = progress.group(1) + ' %'
|
self.UIqueue[self.current]['progress'] = progress.group(1) + ' %'
|
||||||
|
|
||||||
|
@ -511,3 +516,23 @@ class WorkerThread(threading.Thread):
|
||||||
self.UIqueue[self.current]['stat'] = STAT_FINISH_SUCCESS
|
self.UIqueue[self.current]['stat'] = STAT_FINISH_SUCCESS
|
||||||
self.UIqueue[self.current]['progress'] = "100 %"
|
self.UIqueue[self.current]['progress'] = "100 %"
|
||||||
self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime']
|
self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime']
|
||||||
|
|
||||||
|
|
||||||
|
_worker = WorkerThread()
|
||||||
|
_worker.start()
|
||||||
|
|
||||||
|
|
||||||
|
def get_taskstatus():
|
||||||
|
return _worker.get_taskstatus()
|
||||||
|
|
||||||
|
|
||||||
|
def add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text):
|
||||||
|
return _worker.add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text)
|
||||||
|
|
||||||
|
|
||||||
|
def add_upload(user_name, taskMessage):
|
||||||
|
return _worker.add_upload(user_name, taskMessage)
|
||||||
|
|
||||||
|
|
||||||
|
def add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail=None):
|
||||||
|
return _worker.add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user