Code refactoring and improved error handling for edit user list
Update teststatus
This commit is contained in:
parent
665210e506
commit
8acd1f1fe4
174
cps/admin.py
174
cps/admin.py
|
@ -42,7 +42,8 @@ from sqlalchemy.sql.expression import func, or_
|
|||
from . import constants, logger, helper, services
|
||||
from .cli import filepicker
|
||||
from . import db, calibre_db, ub, web_server, get_locale, config, updater_thread, babel, gdriveutils
|
||||
from .helper import check_valid_domain, send_test_mail, reset_password, generate_password_hash
|
||||
from .helper import check_valid_domain, send_test_mail, reset_password, generate_password_hash, check_email, \
|
||||
valid_email, check_username
|
||||
from .gdriveutils import is_gdrive_ready, gdrive_support
|
||||
from .render_template import render_title_template, get_sidebar_config
|
||||
from . import debug_info
|
||||
|
@ -151,7 +152,7 @@ def shutdown():
|
|||
else:
|
||||
showtext['text'] = _(u'Performing shutdown of server, please close window')
|
||||
# stop gevent/tornado server
|
||||
web_server.stop(task == 0)
|
||||
web_server.stop(task==0)
|
||||
return json.dumps(showtext)
|
||||
|
||||
if task == 2:
|
||||
|
@ -241,9 +242,8 @@ def edit_user_table():
|
|||
@admin_required
|
||||
def list_users():
|
||||
off = request.args.get("offset") or 0
|
||||
limit = request.args.get("limit") or 40
|
||||
limit = request.args.get("limit") or 10
|
||||
search = request.args.get("search")
|
||||
|
||||
all_user = ub.session.query(ub.User)
|
||||
if not config.config_anonbrowse:
|
||||
all_user = all_user.filter(ub.User.role.op('&')(constants.ROLE_ANONYMOUS) != constants.ROLE_ANONYMOUS)
|
||||
|
@ -259,22 +259,13 @@ def list_users():
|
|||
filtered_count = total_count
|
||||
|
||||
for user in users:
|
||||
# set readable locale
|
||||
#try:
|
||||
# user.local = LC.parse(user.locale).get_language_name(get_locale())
|
||||
#except UnknownLocaleError:
|
||||
# # This should not happen
|
||||
# user.local = _(isoLanguages.get(part1=user.locale).name)
|
||||
# Set default language
|
||||
if user.default_language == "all":
|
||||
user.default = _("all")
|
||||
else:
|
||||
user.default = LC.parse(user.default_language).get_language_name(get_locale())
|
||||
|
||||
table_entries = {'totalNotFiltered': total_count, 'total': filtered_count, "rows": users}
|
||||
|
||||
js_list = json.dumps(table_entries, cls=db.AlchemyEncoder)
|
||||
|
||||
response = make_response(js_list)
|
||||
response.headers["Content-Type"] = "application/json; charset=utf-8"
|
||||
return response
|
||||
|
@ -294,7 +285,7 @@ def table_get_locale():
|
|||
ret = list()
|
||||
current_locale = get_locale()
|
||||
for loc in locale:
|
||||
ret.append({'value':str(loc),'text':loc.get_language_name(current_locale)})
|
||||
ret.append({'value': str(loc), 'text': loc.get_language_name(current_locale)})
|
||||
return json.dumps(ret)
|
||||
|
||||
|
||||
|
@ -306,7 +297,7 @@ def table_get_default_lang():
|
|||
ret = list()
|
||||
ret.append({'value':'all','text':_('Show All')})
|
||||
for lang in languages:
|
||||
ret.append({'value':lang.lang_code,'text': lang.name})
|
||||
ret.append({'value': lang.lang_code, 'text': lang.name})
|
||||
return json.dumps(ret)
|
||||
|
||||
|
||||
|
@ -333,25 +324,25 @@ def edit_list_user(param):
|
|||
else:
|
||||
return ""
|
||||
for user in users:
|
||||
if param =='name':
|
||||
if not ub.session.query(ub.User).filter(ub.User.name == vals['value']).scalar():
|
||||
user.name = vals['value']
|
||||
else:
|
||||
log.error(u"This username is already taken")
|
||||
return _(u"This username is already taken"), 400
|
||||
try:
|
||||
vals['value'] = vals['value'].strip()
|
||||
if param == 'name':
|
||||
if user.name == "Guest":
|
||||
raise Exception(_("Guest Name can't be changed"))
|
||||
user.name = check_username(vals['value'])
|
||||
elif param =='email':
|
||||
existing_email = ub.session.query(ub.User).filter(ub.User.email == vals['value'].lower()).first()
|
||||
if not existing_email:
|
||||
user.email = vals['value']
|
||||
else:
|
||||
log.error(u"Found an existing account for this e-mail address.")
|
||||
return _(u"Found an existing account for this e-mail address."), 400
|
||||
user.email = check_email(vals['value'])
|
||||
elif param == 'kindle_mail':
|
||||
user.kindle_mail = vals['value']
|
||||
user.kindle_mail = valid_email(vals['value']) if vals['value'] else ""
|
||||
elif param == 'role':
|
||||
if vals['value'] == 'true':
|
||||
user.role |= int(vals['field_index'])
|
||||
else:
|
||||
if int(vals['field_index']) == constants.ROLE_ADMIN:
|
||||
if not ub.session.query(ub.User).\
|
||||
filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
|
||||
ub.User.id != user.id).count():
|
||||
return _(u"No admin user remaining, can't remove admin role", nick=user.name), 400
|
||||
user.role &= ~int(vals['field_index'])
|
||||
elif param == 'sidebar_view':
|
||||
if vals['value'] == 'true':
|
||||
|
@ -370,6 +361,8 @@ def edit_list_user(param):
|
|||
user.locale = vals['value']
|
||||
elif param == 'default_language':
|
||||
user.default_language = vals['value']
|
||||
except Exception as ex:
|
||||
return str(ex), 400
|
||||
ub.session_commit()
|
||||
return ""
|
||||
|
||||
|
@ -378,7 +371,6 @@ def edit_list_user(param):
|
|||
@login_required
|
||||
@admin_required
|
||||
def update_table_settings():
|
||||
# ToDo: Save table settings
|
||||
current_user.view_settings['useredit'] = json.loads(request.data)
|
||||
try:
|
||||
try:
|
||||
|
@ -387,7 +379,7 @@ def update_table_settings():
|
|||
pass
|
||||
ub.session.commit()
|
||||
except (InvalidRequestError, OperationalError):
|
||||
log.error("Invalid request received: %r ", request, )
|
||||
log.error("Invalid request received: {}".format(request))
|
||||
return "Invalid request", 400
|
||||
return ""
|
||||
|
||||
|
@ -787,7 +779,6 @@ def pathchooser():
|
|||
folders = []
|
||||
|
||||
files = []
|
||||
# locale = get_locale()
|
||||
for f in folders:
|
||||
try:
|
||||
data = {"name": f, "fullpath": os.path.join(cwd, f)}
|
||||
|
@ -932,7 +923,7 @@ def _configuration_ldap_helper(to_save, gdrive_error):
|
|||
reboot_required |= _config_string(to_save, "config_ldap_cert_path")
|
||||
reboot_required |= _config_string(to_save, "config_ldap_key_path")
|
||||
_config_string(to_save, "config_ldap_group_name")
|
||||
if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "":
|
||||
if to_save.get("config_ldap_serv_password", "") != "":
|
||||
reboot_required |= 1
|
||||
config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8')
|
||||
config.save()
|
||||
|
@ -970,7 +961,7 @@ def _configuration_ldap_helper(to_save, gdrive_error):
|
|||
return reboot_required, _configuration_result(_('LDAP User Object Filter Has Unmatched Parenthesis'),
|
||||
gdrive_error)
|
||||
|
||||
if "ldap_import_user_filter" in to_save and to_save["ldap_import_user_filter"] == '0':
|
||||
if to_save.get("ldap_import_user_filter") == '0':
|
||||
config.config_ldap_member_user_object = ""
|
||||
else:
|
||||
if config.config_ldap_member_user_object.count("%s") != 1:
|
||||
|
@ -1095,8 +1086,8 @@ def _configuration_update_helper(configured):
|
|||
if config.config_use_google_drive and is_gdrive_ready() and not os.path.exists(metadata_db):
|
||||
gdriveutils.downloadFile(None, "metadata.db", metadata_db)
|
||||
db_change = True
|
||||
except Exception as e:
|
||||
return _configuration_result('%s' % e, gdrive_error, configured)
|
||||
except Exception as ex:
|
||||
return _configuration_result('%s' % ex, gdrive_error, configured)
|
||||
|
||||
if db_change:
|
||||
if not calibre_db.setup_db(config, ub.app_DB_path):
|
||||
|
@ -1148,7 +1139,6 @@ def _configuration_result(error_flash=None, gdrive_error=None, configured=True):
|
|||
|
||||
def _handle_new_user(to_save, content, languages, translations, kobo_support):
|
||||
content.default_language = to_save["default_language"]
|
||||
# content.mature_content = "Show_mature_content" in to_save
|
||||
content.locale = to_save.get("locale", content.locale)
|
||||
|
||||
content.sidebar_view = sum(int(key[5:]) for key in to_save if key.startswith('show_'))
|
||||
|
@ -1156,28 +1146,21 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support):
|
|||
content.sidebar_view |= constants.DETAIL_RANDOM
|
||||
|
||||
content.role = constants.selected_roles(to_save)
|
||||
|
||||
if not to_save["name"] or not to_save["email"] or not to_save["password"]:
|
||||
flash(_(u"Please fill out all fields!"), category="error")
|
||||
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
|
||||
registered_oauth=oauth_check, kobo_support=kobo_support,
|
||||
title=_(u"Add new user"))
|
||||
content.password = generate_password_hash(to_save["password"])
|
||||
existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == to_save["name"].lower()) \
|
||||
.first()
|
||||
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
|
||||
.first()
|
||||
if not existing_user and not existing_email:
|
||||
content.name = to_save["name"]
|
||||
if config.config_public_reg and not check_valid_domain(to_save["email"]):
|
||||
flash(_(u"E-mail is not from valid domain"), category="error")
|
||||
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
|
||||
registered_oauth=oauth_check, kobo_support=kobo_support,
|
||||
title=_(u"Add new user"))
|
||||
else:
|
||||
content.email = to_save["email"]
|
||||
else:
|
||||
flash(_(u"Found an existing account for this e-mail address or name."), category="error")
|
||||
try:
|
||||
if not to_save["name"] or not to_save["email"] or not to_save["password"]:
|
||||
log.info("Missing entries on new user")
|
||||
raise Exception(_(u"Please fill out all fields!"))
|
||||
content.email = check_email(to_save["email"])
|
||||
# Query User name, if not existing, change
|
||||
content.name = check_username(to_save["name"])
|
||||
if to_save.get("kindle_mail"):
|
||||
content.kindle_mail = valid_email(to_save["kindle_mail"])
|
||||
if config.config_public_reg and not check_valid_domain(content.email):
|
||||
log.info("E-mail: {} for new user is not from valid domain".format(content.email))
|
||||
raise Exception(_(u"E-mail is not from valid domain"))
|
||||
except Exception as ex:
|
||||
flash(str(ex), category="error")
|
||||
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
|
||||
languages=languages, title=_(u"Add new user"), page="newuser",
|
||||
kobo_support=kobo_support, registered_oauth=oauth_check)
|
||||
|
@ -1199,7 +1182,7 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support):
|
|||
|
||||
|
||||
def _handle_edit_user(to_save, content, languages, translations, kobo_support):
|
||||
if "delete" in to_save:
|
||||
if to_save.get("delete"):
|
||||
if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
|
||||
ub.User.id != content.id).count():
|
||||
ub.session.query(ub.User).filter(ub.User.id == content.id).delete()
|
||||
|
@ -1214,8 +1197,7 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support):
|
|||
ub.User.id != content.id).count() and 'admin_role' not in to_save:
|
||||
flash(_(u"No admin user remaining, can't remove admin role", nick=content.name), category="error")
|
||||
return redirect(url_for('admin.admin'))
|
||||
|
||||
if "password" in to_save and to_save["password"]:
|
||||
if to_save.get("password"):
|
||||
content.password = generate_password_hash(to_save["password"])
|
||||
anonymous = content.is_anonymous
|
||||
content.role = constants.selected_roles(to_save)
|
||||
|
@ -1233,22 +1215,27 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support):
|
|||
elif value not in val and content.check_visibility(value):
|
||||
content.sidebar_view &= ~value
|
||||
|
||||
if "Show_detail_random" in to_save:
|
||||
if to_save.get("Show_detail_random"):
|
||||
content.sidebar_view |= constants.DETAIL_RANDOM
|
||||
else:
|
||||
content.sidebar_view &= ~constants.DETAIL_RANDOM
|
||||
|
||||
if "default_language" in to_save:
|
||||
if to_save.get("default_language"):
|
||||
content.default_language = to_save["default_language"]
|
||||
if "locale" in to_save and to_save["locale"]:
|
||||
if to_save.get("locale"):
|
||||
content.locale = to_save["locale"]
|
||||
if to_save["email"] and to_save["email"] != content.email:
|
||||
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
|
||||
.first()
|
||||
if not existing_email:
|
||||
content.email = to_save["email"]
|
||||
else:
|
||||
flash(_(u"Found an existing account for this e-mail address."), category="error")
|
||||
try:
|
||||
if to_save.get("email", content.email) != content.email:
|
||||
content.email = check_email(to_save["email"])
|
||||
# Query User name, if not existing, change
|
||||
if to_save.get("name", content.name) != content.name:
|
||||
if to_save.get("name") == "Guest":
|
||||
raise Exception(_("Guest Name can't be changed"))
|
||||
content.name = check_username(to_save["name"])
|
||||
if to_save.get("kindle_mail") != content.kindle_mail:
|
||||
content.kindle_mail = valid_email(to_save["kindle_mail"]) if to_save["kindle_mail"] else ""
|
||||
except Exception as ex:
|
||||
flash(str(ex), category="error")
|
||||
return render_title_template("user_edit.html",
|
||||
translations=translations,
|
||||
languages=languages,
|
||||
|
@ -1257,25 +1244,8 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support):
|
|||
new_user=0,
|
||||
content=content,
|
||||
registered_oauth=oauth_check,
|
||||
title=_(u"Edit User %(nick)s", nick=content.name), page="edituser")
|
||||
if "name" in to_save and to_save["name"] != content.name:
|
||||
# Query User name, if not existing, change
|
||||
if not ub.session.query(ub.User).filter(ub.User.name == to_save["name"]).scalar():
|
||||
content.name = to_save["name"]
|
||||
else:
|
||||
flash(_(u"This username is already taken"), category="error")
|
||||
return render_title_template("user_edit.html",
|
||||
translations=translations,
|
||||
languages=languages,
|
||||
mail_configured=config.get_mail_server_configured(),
|
||||
new_user=0, content=content,
|
||||
registered_oauth=oauth_check,
|
||||
kobo_support=kobo_support,
|
||||
title=_(u"Edit User %(nick)s", nick=content.name),
|
||||
page="edituser")
|
||||
|
||||
if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail:
|
||||
content.kindle_mail = to_save["kindle_mail"]
|
||||
try:
|
||||
ub.session_commit()
|
||||
flash(_(u"User '%(nick)s' updated", nick=content.name), category="success")
|
||||
|
@ -1330,10 +1300,10 @@ def update_mailsettings():
|
|||
elif to_save.get("gmail"):
|
||||
try:
|
||||
config.mail_gmail_token = services.gmail.setup_gmail(config.mail_gmail_token)
|
||||
flash(_(u"G-Mail Account Verification Successfull"), category="success")
|
||||
except Exception as e:
|
||||
flash(e, category="error")
|
||||
log.error(e)
|
||||
flash(_(u"G-Mail Account Verification Successful"), category="success")
|
||||
except Exception as ex:
|
||||
flash(str(ex), category="error")
|
||||
log.error(ex)
|
||||
return edit_mailsettings()
|
||||
|
||||
else:
|
||||
|
@ -1389,7 +1359,8 @@ def edit_user(user_id):
|
|||
registered_oauth=oauth_check,
|
||||
mail_configured=config.get_mail_server_configured(),
|
||||
kobo_support=kobo_support,
|
||||
title=_(u"Edit User %(nick)s", nick=content.name), page="edituser")
|
||||
title=_(u"Edit User %(nick)s", nick=content.name),
|
||||
page="edituser")
|
||||
|
||||
|
||||
@admi.route("/admin/resetpassword/<int:user_id>")
|
||||
|
@ -1530,9 +1501,12 @@ def ldap_import_create_user(user, user_data):
|
|||
else:
|
||||
log.debug('No Mail Field Found in LDAP Response')
|
||||
useremail = username + '@email.com'
|
||||
|
||||
try:
|
||||
# check for duplicate email
|
||||
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first():
|
||||
log.warning("LDAP Email %s Already in Database", user_data)
|
||||
useremail = check_email(useremail)
|
||||
except Exception as ex:
|
||||
log.warning("LDAP Email Error: {}, {}".format(user_data, ex))
|
||||
return 0, None
|
||||
content = ub.User()
|
||||
content.name = username
|
||||
|
@ -1549,8 +1523,8 @@ def ldap_import_create_user(user, user_data):
|
|||
try:
|
||||
ub.session.commit()
|
||||
return 1, None # increase no of users
|
||||
except Exception as e:
|
||||
log.warning("Failed to create LDAP user: %s - %s", user, e)
|
||||
except Exception as ex:
|
||||
log.warning("Failed to create LDAP user: %s - %s", user, ex)
|
||||
ub.session.rollback()
|
||||
message = _(u'Failed to Create at Least One LDAP User')
|
||||
return 0, message
|
||||
|
@ -1583,16 +1557,16 @@ def import_ldap_users():
|
|||
query_filter = config.config_ldap_user_object
|
||||
try:
|
||||
user_identifier = extract_user_identifier(user, query_filter)
|
||||
except Exception as e:
|
||||
log.warning(e)
|
||||
except Exception as ex:
|
||||
log.warning(ex)
|
||||
continue
|
||||
else:
|
||||
user_identifier = user
|
||||
query_filter = None
|
||||
try:
|
||||
user_data = services.ldap.get_object_details(user=user_identifier, query_filter=query_filter)
|
||||
except AttributeError as e:
|
||||
log.debug_or_exception(e)
|
||||
except AttributeError as ex:
|
||||
log.debug_or_exception(ex)
|
||||
continue
|
||||
if user_data:
|
||||
user_count, message = ldap_import_create_user(user, user_data)
|
||||
|
|
|
@ -105,8 +105,8 @@ def _extract_Cover_from_archive(original_file_extension, tmp_file_name, rarExecu
|
|||
if extension in COVER_EXTENSIONS:
|
||||
cover_data = cf.read(name)
|
||||
break
|
||||
except Exception as e:
|
||||
log.debug('Rarfile failed with error: %s', e)
|
||||
except Exception as ex:
|
||||
log.debug('Rarfile failed with error: %s', ex)
|
||||
return cover_data
|
||||
|
||||
|
||||
|
|
12
cps/db.py
12
cps/db.py
|
@ -544,8 +544,8 @@ class CalibreDB():
|
|||
|
||||
conn = cls.engine.connect()
|
||||
# conn.text_factory = lambda b: b.decode(errors = 'ignore') possible fix for #1302
|
||||
except Exception as e:
|
||||
config.invalidate(e)
|
||||
except Exception as ex:
|
||||
config.invalidate(ex)
|
||||
return False
|
||||
|
||||
config.db_configured = True
|
||||
|
@ -646,8 +646,8 @@ class CalibreDB():
|
|||
pagination = Pagination(page, pagesize,
|
||||
len(query.all()))
|
||||
entries = query.order_by(*order).offset(off).limit(pagesize).all()
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
#for book in entries:
|
||||
# book = self.order_authors(book)
|
||||
return entries, randm, pagination
|
||||
|
@ -791,7 +791,7 @@ class CalibreDB():
|
|||
def lcase(s):
|
||||
try:
|
||||
return unidecode.unidecode(s.lower())
|
||||
except Exception as e:
|
||||
except Exception as ex:
|
||||
log = logger.create()
|
||||
log.debug_or_exception(e)
|
||||
log.debug_or_exception(ex)
|
||||
return s.lower()
|
||||
|
|
|
@ -336,8 +336,8 @@ def delete_book(book_id, book_format, jsonResponse):
|
|||
calibre_db.session.query(db.Data).filter(db.Data.book == book.id).\
|
||||
filter(db.Data.format == book_format).delete()
|
||||
calibre_db.session.commit()
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
calibre_db.session.rollback()
|
||||
else:
|
||||
# book not found
|
||||
|
@ -726,10 +726,10 @@ def edit_book(book_id):
|
|||
edited_books_id = None
|
||||
|
||||
# handle book title
|
||||
modif_date |= handle_title_on_edit(book, to_save["book_title"])
|
||||
title_change = handle_title_on_edit(book, to_save["book_title"])
|
||||
|
||||
input_authors, change = handle_author_on_edit(book, to_save["author_name"])
|
||||
if change:
|
||||
input_authors, authorchange = handle_author_on_edit(book, to_save["author_name"])
|
||||
if authorchange or title_change:
|
||||
edited_books_id = book.id
|
||||
modif_date = True
|
||||
|
||||
|
@ -801,8 +801,8 @@ def edit_book(book_id):
|
|||
calibre_db.session.rollback()
|
||||
flash(error, category="error")
|
||||
return render_edit_book(book_id)
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
calibre_db.session.rollback()
|
||||
flash(_("Error editing book, please check logfile for details"), category="error")
|
||||
return redirect(url_for('web.show_book', book_id=book.id))
|
||||
|
|
|
@ -155,6 +155,6 @@ def on_received_watch_confirmation():
|
|||
# prevent error on windows, as os.rename does on existing files, also allow cross hdd move
|
||||
move(os.path.join(tmp_dir, "tmp_metadata.db"), dbpath)
|
||||
calibre_db.reconnect_db(config, ub.app_DB_path)
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
return ''
|
||||
|
|
|
@ -202,8 +202,8 @@ def getDrive(drive=None, gauth=None):
|
|||
gauth.Refresh()
|
||||
except RefreshError as e:
|
||||
log.error("Google Drive error: %s", e)
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
else:
|
||||
# Initialize the saved creds
|
||||
gauth.Authorize()
|
||||
|
@ -497,8 +497,8 @@ def getChangeById (drive, change_id):
|
|||
except (errors.HttpError) as error:
|
||||
log.error(error)
|
||||
return None
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
except Exception as ex:
|
||||
log.error(ex)
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ from babel.units import format_unit
|
|||
from flask import send_from_directory, make_response, redirect, abort, url_for
|
||||
from flask_babel import gettext as _
|
||||
from flask_login import current_user
|
||||
from sqlalchemy.sql.expression import true, false, and_, text
|
||||
from sqlalchemy.sql.expression import true, false, and_, text, func
|
||||
from werkzeug.datastructures import Headers
|
||||
from werkzeug.security import generate_password_hash
|
||||
|
||||
|
@ -504,6 +504,31 @@ def uniq(inpt):
|
|||
output.append(x)
|
||||
return output
|
||||
|
||||
def check_email(email):
|
||||
email = valid_email(email)
|
||||
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == email.lower()).first():
|
||||
log.error(u"Found an existing account for this e-mail address")
|
||||
raise Exception(_(u"Found an existing account for this e-mail address"))
|
||||
return email
|
||||
|
||||
|
||||
def check_username(username):
|
||||
username = username.strip()
|
||||
if ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).scalar():
|
||||
log.error(u"This username is already taken")
|
||||
raise Exception (_(u"This username is already taken"))
|
||||
return username
|
||||
|
||||
|
||||
def valid_email(email):
|
||||
email = email.strip()
|
||||
# Regex according to https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input/email#validation
|
||||
if not re.search(r"^[\w.!#$%&'*+\\/=?^_`{|}~-]+@[\w](?:[\w-]{0,61}[\w])?(?:\.[\w](?:[\w-]{0,61}[\w])?)*$",
|
||||
email):
|
||||
log.error(u"Invalid e-mail address format")
|
||||
raise Exception(_(u"Invalid e-mail address format"))
|
||||
return email
|
||||
|
||||
# ################################# External interface #################################
|
||||
|
||||
|
||||
|
@ -551,8 +576,8 @@ def get_book_cover_internal(book, use_generic_cover_on_failure):
|
|||
else:
|
||||
log.error('%s/cover.jpg not found on Google Drive', book.path)
|
||||
return get_cover_on_failure(use_generic_cover_on_failure)
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
return get_cover_on_failure(use_generic_cover_on_failure)
|
||||
else:
|
||||
cover_file_path = os.path.join(config.config_calibre_dir, book.path)
|
||||
|
|
|
@ -262,8 +262,8 @@ def generate_sync_response(sync_token, sync_results, set_cont=False):
|
|||
extra_headers["x-kobo-sync-mode"] = store_response.headers.get("x-kobo-sync-mode")
|
||||
extra_headers["x-kobo-recent-reads"] = store_response.headers.get("x-kobo-recent-reads")
|
||||
|
||||
except Exception as e:
|
||||
log.error("Failed to receive or parse response from Kobo's sync endpoint: " + str(e))
|
||||
except Exception as ex:
|
||||
log.error("Failed to receive or parse response from Kobo's sync endpoint: {}".format(ex))
|
||||
if set_cont:
|
||||
extra_headers["x-kobo-sync"] = "continue"
|
||||
sync_token.to_headers(extra_headers)
|
||||
|
|
|
@ -147,8 +147,8 @@ def bind_oauth_or_register(provider_id, provider_user_id, redirect_url, provider
|
|||
ub.session.commit()
|
||||
flash(_(u"Link to %(oauth)s Succeeded", oauth=provider_name), category="success")
|
||||
return redirect(url_for('web.profile'))
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
ub.session.rollback()
|
||||
else:
|
||||
flash(_(u"Login failed, No User Linked With OAuth Account"), category="error")
|
||||
|
@ -194,8 +194,8 @@ def unlink_oauth(provider):
|
|||
ub.session.commit()
|
||||
logout_oauth_user()
|
||||
flash(_(u"Unlink to %(oauth)s Succeeded", oauth=oauth_check[provider]), category="success")
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
ub.session.rollback()
|
||||
flash(_(u"Unlink to %(oauth)s Failed", oauth=oauth_check[provider]), category="error")
|
||||
except NoResultFound:
|
||||
|
|
|
@ -159,9 +159,9 @@ class CalibreTask:
|
|||
# catch any unhandled exceptions in a task and automatically fail it
|
||||
try:
|
||||
self.run(*args)
|
||||
except Exception as e:
|
||||
self._handleError(str(e))
|
||||
log.debug_or_exception(e)
|
||||
except Exception as ex:
|
||||
self._handleError(str(ex))
|
||||
log.debug_or_exception(ex)
|
||||
|
||||
self.end_time = datetime.now()
|
||||
|
||||
|
|
|
@ -255,13 +255,13 @@ def create_edit_shelf(shelf, title, page, shelf_id=False):
|
|||
log.info(u"Shelf {} {}".format(to_save["title"], shelf_action))
|
||||
flash(flash_text, category="success")
|
||||
return redirect(url_for('shelf.show_shelf', shelf_id=shelf.id))
|
||||
except (OperationalError, InvalidRequestError) as e:
|
||||
except (OperationalError, InvalidRequestError) as ex:
|
||||
ub.session.rollback()
|
||||
log.debug_or_exception(e)
|
||||
log.debug_or_exception(ex)
|
||||
flash(_(u"Settings DB is not Writeable"), category="error")
|
||||
except Exception as e:
|
||||
except Exception as ex:
|
||||
ub.session.rollback()
|
||||
log.debug_or_exception(e)
|
||||
log.debug_or_exception(ex)
|
||||
flash(_(u"There was an error"), category="error")
|
||||
return render_title_template('shelf_edit.html', shelf=shelf, title=title, page=page)
|
||||
|
||||
|
|
|
@ -452,6 +452,17 @@ $(function() {
|
|||
$(this).next().text(elText);
|
||||
});
|
||||
},
|
||||
onLoadSuccess: function () {
|
||||
var guest = $(".editable[data-name='name'][data-value='Guest']");
|
||||
guest.editable("disable");
|
||||
$(".editable[data-name='locale'][data-pk='"+guest.data("pk")+"']").editable("disable");
|
||||
$("input[data-name='admin_role'][data-pk='"+guest.data("pk")+"']").prop("disabled", true);
|
||||
$("input[data-name='passwd_role'][data-pk='"+guest.data("pk")+"']").prop("disabled", true);
|
||||
$("input[data-name='edit_shelf_role'][data-pk='"+guest.data("pk")+"']").prop("disabled", true);
|
||||
// ToDo: Disable delete
|
||||
|
||||
},
|
||||
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
/*onEditableSave: function (field, row, oldvalue, $el) {
|
||||
if (field === "title" || field === "authors") {
|
||||
|
@ -612,9 +623,9 @@ function singleUserFormatter(value, row) {
|
|||
|
||||
function checkboxFormatter(value, row, index){
|
||||
if(value & this.column)
|
||||
return '<input type="checkbox" class="chk" checked onchange="checkboxChange(this, ' + row.id + ', \'' + this.field + '\', ' + this.column + ')">';
|
||||
return '<input type="checkbox" class="chk" data-pk="' + row.id + '" data-name="' + this.name + '" checked onchange="checkboxChange(this, ' + row.id + ', \'' + this.field + '\', ' + this.column + ')">';
|
||||
else
|
||||
return '<input type="checkbox" class="chk" onchange="checkboxChange(this, ' + row.id + ', \'' + this.field + '\', ' + this.column + ')">';
|
||||
return '<input type="checkbox" class="chk" data-pk="' + row.id + '" data-name="' + this.name + '" onchange="checkboxChange(this, ' + row.id + ', \'' + this.field + '\', ' + this.column + ')">';
|
||||
}
|
||||
|
||||
function checkboxChange(checkbox, userId, field, field_index) {
|
||||
|
@ -622,6 +633,11 @@ function checkboxChange(checkbox, userId, field, field_index) {
|
|||
method:"post",
|
||||
url: window.location.pathname + "/../../ajax/editlistusers/" + field,
|
||||
data: {"pk":userId, "field_index":field_index, "value": checkbox.checked}
|
||||
/*<div className="editable-buttons">
|
||||
<button type="button" className="btn btn-default btn-sm editable-cancel"><i
|
||||
className="glyphicon glyphicon-remove"></i></button>
|
||||
</div>*/
|
||||
/*<div className="editable-error-block help-block" style="">Text to show</div>*/
|
||||
});
|
||||
$.ajax({
|
||||
method:"get",
|
||||
|
|
|
@ -75,8 +75,8 @@ class TaskConvert(CalibreTask):
|
|||
self.settings['body'],
|
||||
internal=True)
|
||||
)
|
||||
except Exception as e:
|
||||
return self._handleError(str(e))
|
||||
except Exception as ex:
|
||||
return self._handleError(str(ex))
|
||||
|
||||
def _convert_ebook_format(self):
|
||||
error_message = None
|
||||
|
|
|
@ -158,9 +158,9 @@ class TaskEmail(CalibreTask):
|
|||
except socket.error as e:
|
||||
log.debug_or_exception(e)
|
||||
self._handleError(u'Socket Error sending email: {}'.format(e.strerror))
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
self._handleError(u'Error sending email: {}'.format(e))
|
||||
except Exception as ex:
|
||||
log.debug_or_exception(ex)
|
||||
self._handleError(u'Error sending email: {}'.format(ex))
|
||||
|
||||
|
||||
def send_standard_email(self, msg):
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
data-visible="{{visiblility.get(parameter)}}"
|
||||
data-editable-type="select"
|
||||
data-edit="true"
|
||||
data-sortable="true"
|
||||
data-editable-url="{{ url_for('admin.edit_list_user', param=parameter)}}"
|
||||
data-editable-source={{url}}
|
||||
{% if validate %}data-edit-validate="{{ _('This Field is Required') }}"{% endif %}>
|
||||
|
@ -65,6 +66,7 @@
|
|||
data-visible="{{visiblility.get(parameter)}}"
|
||||
data-editable-type="select"
|
||||
data-edit="true"
|
||||
data-sortable="true"
|
||||
data-editable-url="{{ url_for('admin.edit_list_user', param=parameter)}}"
|
||||
data-editable-source={{url}}
|
||||
{% if validate %}data-edit-validate="{{ _('This Field is Required') }}"{% endif %}>
|
||||
|
@ -101,7 +103,7 @@
|
|||
<th data-name="id" data-field="id" id="id" data-visible="false" data-switchable="false"></th>
|
||||
{{ user_table_row('name', _('Enter Username'), _('Username'), true) }}
|
||||
{{ user_table_row('email', _('Enter E-mail Address'), _('E-mail Address'), true) }}
|
||||
{{ user_table_row('kindle_mail', _('Enter Kindle E-mail Address'), _('Kindle E-mail'), true) }}
|
||||
{{ user_table_row('kindle_mail', _('Enter Kindle E-mail Address'), _('Kindle E-mail'), false) }}
|
||||
{{ user_select_translations('locale', url_for('admin.table_get_locale'), _('Locale'), true) }}
|
||||
{{ user_select_languages('default_language', url_for('admin.table_get_default_lang'), _('Visible Book Languages'), true) }}
|
||||
{{ user_table_row('denied_tags', _("Edit Denied Tags"), _("Denied Tags"), false, true, 0) }}
|
||||
|
|
|
@ -117,8 +117,8 @@ def parse_xmp(pdf_file):
|
|||
"""
|
||||
try:
|
||||
xmp_info = pdf_file.getXmpMetadata()
|
||||
except Exception as e:
|
||||
log.debug('Can not read XMP metadata %e', e)
|
||||
except Exception as ex:
|
||||
log.debug('Can not read XMP metadata {}'.format(ex))
|
||||
return None
|
||||
|
||||
if xmp_info:
|
||||
|
@ -162,8 +162,8 @@ def parse_xmp(pdf_file):
|
|||
"""
|
||||
try:
|
||||
xmp_info = pdf_file.getXmpMetadata()
|
||||
except Exception as e:
|
||||
log.debug('Can not read XMP metadata', e)
|
||||
except Exception as ex:
|
||||
log.debug('Can not read XMP metadata {}'.format(ex))
|
||||
return None
|
||||
|
||||
if xmp_info:
|
||||
|
|
116
cps/web.py
116
cps/web.py
|
@ -24,7 +24,6 @@ from __future__ import division, print_function, unicode_literals
|
|||
import os
|
||||
from datetime import datetime
|
||||
import json
|
||||
import re
|
||||
import mimetypes
|
||||
import chardet # dependency of requests
|
||||
|
||||
|
@ -50,9 +49,9 @@ from . import constants, logger, isoLanguages, services
|
|||
from . import babel, db, ub, config, get_locale, app
|
||||
from . import calibre_db
|
||||
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
|
||||
from .helper import check_valid_domain, render_task_status, \
|
||||
from .helper import check_valid_domain, render_task_status, check_email, check_username, \
|
||||
get_cc_columns, get_book_cover, get_download_link, send_mail, generate_random_password, \
|
||||
send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password
|
||||
send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password, valid_email
|
||||
from .pagination import Pagination
|
||||
from .redirect import redirect_back
|
||||
from .usermanagement import login_required_if_no_ano
|
||||
|
@ -215,8 +214,8 @@ def update_view():
|
|||
for element in to_save:
|
||||
for param in to_save[element]:
|
||||
current_user.set_view_property(element, param, to_save[element][param])
|
||||
except Exception as e:
|
||||
log.error("Could not save view_settings: %r %r: %e", request, to_save, e)
|
||||
except Exception as ex:
|
||||
log.error("Could not save view_settings: %r %r: %e", request, to_save, ex)
|
||||
return "Invalid request", 400
|
||||
return "1", 200
|
||||
|
||||
|
@ -1164,14 +1163,6 @@ def extend_search_term(searchterm,
|
|||
searchterm.extend(tag.name for tag in tag_names)
|
||||
tag_names = calibre_db.session.query(db_element).filter(db.Tags.id.in_(tags['exclude_' + key])).all()
|
||||
searchterm.extend(tag.name for tag in tag_names)
|
||||
#serie_names = calibre_db.session.query(db.Series).filter(db.Series.id.in_(tags['include_serie'])).all()
|
||||
#searchterm.extend(serie.name for serie in serie_names)
|
||||
#serie_names = calibre_db.session.query(db.Series).filter(db.Series.id.in_(tags['include_serie'])).all()
|
||||
#searchterm.extend(serie.name for serie in serie_names)
|
||||
#shelf_names = ub.session.query(ub.Shelf).filter(ub.Shelf.id.in_(tags['include_shelf'])).all()
|
||||
#searchterm.extend(shelf.name for shelf in shelf_names)
|
||||
#shelf_names = ub.session.query(ub.Shelf).filter(ub.Shelf.id.in_(tags['include_shelf'])).all()
|
||||
#searchterm.extend(shelf.name for shelf in shelf_names)
|
||||
language_names = calibre_db.session.query(db.Languages). \
|
||||
filter(db.Languages.id.in_(tags['include_language'])).all()
|
||||
if language_names:
|
||||
|
@ -1345,11 +1336,7 @@ def serve_book(book_id, book_format, anyname):
|
|||
@login_required_if_no_ano
|
||||
@download_required
|
||||
def download_link(book_id, book_format, anyname):
|
||||
if "Kobo" in request.headers.get('User-Agent'):
|
||||
client = "kobo"
|
||||
else:
|
||||
client=""
|
||||
|
||||
client = "kobo" if "Kobo" in request.headers.get('User-Agent') else ""
|
||||
return get_download_link(book_id, book_format, client)
|
||||
|
||||
|
||||
|
@ -1391,29 +1378,21 @@ def register():
|
|||
|
||||
if request.method == "POST":
|
||||
to_save = request.form.to_dict()
|
||||
if config.config_register_email:
|
||||
nickname = to_save["email"]
|
||||
else:
|
||||
nickname = to_save.get('name', None)
|
||||
if not nickname or not to_save.get("email", None):
|
||||
nickname = to_save["email"].strip() if config.config_register_email else to_save.get('name')
|
||||
if not nickname or not to_save.get("email"):
|
||||
flash(_(u"Please fill out all fields!"), category="error")
|
||||
return render_title_template('register.html', title=_(u"register"), page="register")
|
||||
#if to_save["email"].count("@") != 1 or not \
|
||||
# Regex according to https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input/email#validation
|
||||
if not re.search(r"^[\w.!#$%&'*+\\/=?^_`{|}~-]+@[\w](?:[\w-]{0,61}[\w])?(?:\.[\w](?:[\w-]{0,61}[\w])?)*$",
|
||||
to_save["email"]):
|
||||
flash(_(u"Invalid e-mail address format"), category="error")
|
||||
log.warning('Registering failed for user "%s" e-mail address: %s', nickname, to_save["email"])
|
||||
try:
|
||||
nickname = check_username(nickname)
|
||||
email = check_email(to_save["email"])
|
||||
except Exception as ex:
|
||||
flash(str(ex), category="error")
|
||||
return render_title_template('register.html', title=_(u"register"), page="register")
|
||||
|
||||
existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == nickname
|
||||
.lower()).first()
|
||||
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()).first()
|
||||
if not existing_user and not existing_email:
|
||||
content = ub.User()
|
||||
if check_valid_domain(to_save["email"]):
|
||||
if check_valid_domain(email):
|
||||
content.name = nickname
|
||||
content.email = to_save["email"]
|
||||
content.email = email
|
||||
password = generate_random_password()
|
||||
content.password = generate_password_hash(password)
|
||||
content.role = config.config_default_role
|
||||
|
@ -1423,7 +1402,7 @@ def register():
|
|||
ub.session.commit()
|
||||
if feature_support['oauth']:
|
||||
register_user_with_oauth(content)
|
||||
send_registration_mail(to_save["email"], nickname, password)
|
||||
send_registration_mail(to_save["email"].strip(), nickname, password)
|
||||
except Exception:
|
||||
ub.session.rollback()
|
||||
flash(_(u"An unknown error occurred. Please try again later."), category="error")
|
||||
|
@ -1434,9 +1413,6 @@ def register():
|
|||
return render_title_template('register.html', title=_(u"register"), page="register")
|
||||
flash(_(u"Confirmation e-mail was send to your e-mail account."), category="success")
|
||||
return redirect(url_for('web.login'))
|
||||
else:
|
||||
flash(_(u"This username or e-mail address is already in use."), category="error")
|
||||
return render_title_template('register.html', title=_(u"register"), page="register")
|
||||
|
||||
if feature_support['oauth']:
|
||||
register_user_with_oauth()
|
||||
|
@ -1527,63 +1503,41 @@ def logout():
|
|||
return redirect(url_for('web.login'))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# ################################### Users own configuration #########################################################
|
||||
def change_profile_email(to_save, kobo_support, local_oauth_check, oauth_status):
|
||||
if "email" in to_save and to_save["email"] != current_user.email:
|
||||
if config.config_public_reg and not check_valid_domain(to_save["email"]):
|
||||
flash(_(u"E-mail is not from valid domain"), category="error")
|
||||
return render_title_template("user_edit.html", content=current_user,
|
||||
title=_(u"%(name)s's profile", name=current_user.name), page="me",
|
||||
kobo_support=kobo_support,
|
||||
registered_oauth=local_oauth_check, oauth_status=oauth_status)
|
||||
current_user.email = to_save["email"]
|
||||
|
||||
def change_profile_nickname(to_save, kobo_support, local_oauth_check, translations, languages):
|
||||
if "name" in to_save and to_save["name"] != current_user.name:
|
||||
# Query User name, if not existing, change
|
||||
if not ub.session.query(ub.User).filter(ub.User.name == to_save["name"]).scalar():
|
||||
current_user.name = to_save["name"]
|
||||
else:
|
||||
flash(_(u"This username is already taken"), category="error")
|
||||
return render_title_template("user_edit.html",
|
||||
translations=translations,
|
||||
languages=languages,
|
||||
kobo_support=kobo_support,
|
||||
new_user=0, content=current_user,
|
||||
registered_oauth=local_oauth_check,
|
||||
title=_(u"Edit User %(nick)s",
|
||||
nick=current_user.name),
|
||||
page="edituser")
|
||||
|
||||
|
||||
def change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages):
|
||||
to_save = request.form.to_dict()
|
||||
current_user.random_books = 0
|
||||
if current_user.role_passwd() or current_user.role_admin():
|
||||
if "password" in to_save and to_save["password"]:
|
||||
if to_save.get("password"):
|
||||
current_user.password = generate_password_hash(to_save["password"])
|
||||
if "kindle_mail" in to_save and to_save["kindle_mail"] != current_user.kindle_mail:
|
||||
current_user.kindle_mail = to_save["kindle_mail"]
|
||||
if "allowed_tags" in to_save and to_save["allowed_tags"] != current_user.allowed_tags:
|
||||
try:
|
||||
if to_save.get("allowed_tags", current_user.allowed_tags) != current_user.allowed_tags:
|
||||
current_user.allowed_tags = to_save["allowed_tags"].strip()
|
||||
change_profile_email(to_save, kobo_support, local_oauth_check, oauth_status)
|
||||
change_profile_nickname(to_save, kobo_support, local_oauth_check, translations, languages)
|
||||
if "show_random" in to_save and to_save["show_random"] == "on":
|
||||
current_user.random_books = 1
|
||||
if "default_language" in to_save:
|
||||
if to_save.get("kindle_mail", current_user.kindle_mail) != current_user.kindle_mail:
|
||||
current_user.kindle_mail = valid_email(to_save["kindle_mail"])
|
||||
if to_save.get("email", current_user.email) != current_user.email:
|
||||
current_user.email = check_email(to_save["email"])
|
||||
if to_save.get("name", current_user.name) != current_user.name:
|
||||
# Query User name, if not existing, change
|
||||
current_user.name = check_username(to_save["name"])
|
||||
current_user.random_books = 1 if to_save.get("show_random") == "on" else 0
|
||||
if to_save.get("default_language"):
|
||||
current_user.default_language = to_save["default_language"]
|
||||
if "locale" in to_save:
|
||||
if to_save.get("locale"):
|
||||
current_user.locale = to_save["locale"]
|
||||
except Exception as ex:
|
||||
flash(str(ex), category="error")
|
||||
return render_title_template("user_edit.html", content=current_user,
|
||||
title=_(u"%(name)s's profile", name=current_user.name), page="me",
|
||||
kobo_support=kobo_support,
|
||||
registered_oauth=local_oauth_check, oauth_status=oauth_status)
|
||||
|
||||
val = 0
|
||||
for key, __ in to_save.items():
|
||||
if key.startswith('show'):
|
||||
val += int(key[5:])
|
||||
current_user.sidebar_view = val
|
||||
if "Show_detail_random" in to_save:
|
||||
if to_save.get("Show_detail_random"):
|
||||
current_user.sidebar_view += constants.DETAIL_RANDOM
|
||||
|
||||
try:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user