Some functions refactored

This commit is contained in:
Ozzie Isaacs 2021-03-14 17:34:47 +01:00
parent 42707a19bd
commit 53ee0aaee1

View File

@ -61,6 +61,7 @@ feature_support = {
} }
try: try:
# pylint: disable=unused-import
import rarfile import rarfile
feature_support['rar'] = True feature_support['rar'] = True
except (ImportError, SyntaxError): except (ImportError, SyntaxError):
@ -184,10 +185,10 @@ def admin():
else: else:
commit = version['version'] commit = version['version']
allUser = ub.session.query(ub.User).all() all_user = ub.session.query(ub.User).all()
email_settings = config.get_mail_settings() email_settings = config.get_mail_settings()
kobo_support = feature_support['kobo'] and config.config_kobo_sync kobo_support = feature_support['kobo'] and config.config_kobo_sync
return render_title_template("admin.html", allUser=allUser, email=email_settings, config=config, commit=commit, return render_title_template("admin.html", allUser=all_user, email=email_settings, config=config, commit=commit,
feature_support=feature_support, kobo_support=kobo_support, feature_support=feature_support, kobo_support=kobo_support,
title=_(u"Admin page"), page="admin") title=_(u"Admin page"), page="admin")
@ -729,35 +730,13 @@ def _configuration_logfile_helper(to_save, gdrive_error):
return reboot_required, None return reboot_required, None
def _configuration_ldap_helper(to_save, gdrive_error): def _configuration_ldap_check(reboot_required, to_save, gdrive_error):
reboot_required = False
reboot_required |= _config_string(to_save, "config_ldap_provider_url")
reboot_required |= _config_int(to_save, "config_ldap_port")
reboot_required |= _config_int(to_save, "config_ldap_authentication")
reboot_required |= _config_string(to_save, "config_ldap_dn")
reboot_required |= _config_string(to_save, "config_ldap_serv_username")
reboot_required |= _config_string(to_save, "config_ldap_user_object")
reboot_required |= _config_string(to_save, "config_ldap_group_object_filter")
reboot_required |= _config_string(to_save, "config_ldap_group_members_field")
reboot_required |= _config_string(to_save, "config_ldap_member_user_object")
reboot_required |= _config_checkbox(to_save, "config_ldap_openldap")
reboot_required |= _config_int(to_save, "config_ldap_encryption")
reboot_required |= _config_string(to_save, "config_ldap_cacert_path")
reboot_required |= _config_string(to_save, "config_ldap_cert_path")
reboot_required |= _config_string(to_save, "config_ldap_key_path")
_config_string(to_save, "config_ldap_group_name")
if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "":
reboot_required |= 1
config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8')
config.save()
if not config.config_ldap_provider_url \ if not config.config_ldap_provider_url \
or not config.config_ldap_port \ or not config.config_ldap_port \
or not config.config_ldap_dn \ or not config.config_ldap_dn \
or not config.config_ldap_user_object: or not config.config_ldap_user_object:
return reboot_required, _configuration_result(_('Please Enter a LDAP Provider, ' return reboot_required, _configuration_result(_('Please Enter a LDAP Provider, '
'Port, DN and User Object Identifier'), gdrive_error) 'Port, DN and User Object Identifier'), gdrive_error)
if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS: if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS:
if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE: if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE:
if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password): if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password):
@ -767,6 +746,14 @@ def _configuration_ldap_helper(to_save, gdrive_error):
if not config.config_ldap_serv_username: if not config.config_ldap_serv_username:
return reboot_required, _configuration_result('Please Enter a LDAP Service Account', gdrive_error) return reboot_required, _configuration_result('Please Enter a LDAP Service Account', gdrive_error)
if config.config_ldap_group_object_filter:
if config.config_ldap_group_object_filter.count("%s") != 1:
return reboot_required, \
_configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'),
gdrive_error)
if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"):
return reboot_required, _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'),
gdrive_error)
if config.config_ldap_group_object_filter: if config.config_ldap_group_object_filter:
if config.config_ldap_group_object_filter.count("%s") != 1: if config.config_ldap_group_object_filter.count("%s") != 1:
return reboot_required, \ return reboot_required, \
@ -806,6 +793,31 @@ def _configuration_ldap_helper(to_save, gdrive_error):
return reboot_required, None return reboot_required, None
def _configuration_ldap_helper(to_save, gdrive_error):
reboot_required = False
reboot_required |= _config_string(to_save, "config_ldap_provider_url")
reboot_required |= _config_int(to_save, "config_ldap_port")
reboot_required |= _config_int(to_save, "config_ldap_authentication")
reboot_required |= _config_string(to_save, "config_ldap_dn")
reboot_required |= _config_string(to_save, "config_ldap_serv_username")
reboot_required |= _config_string(to_save, "config_ldap_user_object")
reboot_required |= _config_string(to_save, "config_ldap_group_object_filter")
reboot_required |= _config_string(to_save, "config_ldap_group_members_field")
reboot_required |= _config_string(to_save, "config_ldap_member_user_object")
reboot_required |= _config_checkbox(to_save, "config_ldap_openldap")
reboot_required |= _config_int(to_save, "config_ldap_encryption")
reboot_required |= _config_string(to_save, "config_ldap_cacert_path")
reboot_required |= _config_string(to_save, "config_ldap_cert_path")
reboot_required |= _config_string(to_save, "config_ldap_key_path")
_config_string(to_save, "config_ldap_group_name")
if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "":
reboot_required |= 1
config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8')
config.save()
return _configuration_ldap_check(reboot_required, to_save, gdrive_error)
def _configuration_update_helper(configured): def _configuration_update_helper(configured):
reboot_required = False reboot_required = False
db_change = False db_change = False
@ -1011,18 +1023,33 @@ def _handle_new_user(to_save, content, languages, translations, kobo_support):
ub.session.rollback() ub.session.rollback()
flash(_(u"Settings DB is not Writeable"), category="error") flash(_(u"Settings DB is not Writeable"), category="error")
def delete_user(content):
if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count():
ub.session.query(ub.User).filter(ub.User.id == content.id).delete()
ub.session_commit()
flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success")
return redirect(url_for('admin.admin'))
else:
flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
def save_edited_user(content):
try:
ub.session_commit()
flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success")
except IntegrityError:
ub.session.rollback()
flash(_(u"An unknown error occured."), category="error")
except OperationalError:
ub.session.rollback()
flash(_(u"Settings DB is not Writeable"), category="error")
def _handle_edit_user(to_save, content, languages, translations, kobo_support): def _handle_edit_user(to_save, content, languages, translations, kobo_support):
if "delete" in to_save: if "delete" in to_save:
if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, return delete_user(content)
ub.User.id != content.id).count():
ub.session.query(ub.User).filter(ub.User.id == content.id).delete()
ub.session_commit()
flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success")
return redirect(url_for('admin.admin'))
else:
flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
else: else:
if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count() and 'admin_role' not in to_save: ub.User.id != content.id).count() and 'admin_role' not in to_save:
@ -1090,15 +1117,7 @@ def _handle_edit_user(to_save, content, languages, translations, kobo_support):
if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail: if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail:
content.kindle_mail = to_save["kindle_mail"] content.kindle_mail = to_save["kindle_mail"]
try: return save_edited_user(content)
ub.session_commit()
flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success")
except IntegrityError:
ub.session.rollback()
flash(_(u"An unknown error occured."), category="error")
except OperationalError:
ub.session.rollback()
flash(_(u"Settings DB is not Writeable"), category="error")
@admi.route("/admin/user/new", methods=["GET", "POST"]) @admi.route("/admin/user/new", methods=["GET", "POST"])
@ -1310,6 +1329,55 @@ def get_updater_status():
return '' return ''
def create_ldap_user(user, user_data, config):
imported = 0
showtext = None
user_login_field = extract_dynamic_field_from_filter(user, config.config_ldap_user_object)
username = user_data[user_login_field][0].decode('utf-8')
# check for duplicate username
if ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == username.lower()).first():
# if ub.session.query(ub.User).filter(ub.User.nickname == username).first():
log.warning("LDAP User %s Already in Database", user_data)
return imported, showtext
kindlemail = ''
if 'mail' in user_data:
useremail = user_data['mail'][0].decode('utf-8')
if len(user_data['mail']) > 1:
kindlemail = user_data['mail'][1].decode('utf-8')
else:
log.debug('No Mail Field Found in LDAP Response')
useremail = username + '@email.com'
# check for duplicate email
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first():
log.warning("LDAP Email %s Already in Database", user_data)
return imported, showtext
content = ub.User()
content.nickname = username
content.password = '' # dummy password which will be replaced by ldap one
content.email = useremail
content.kindle_mail = kindlemail
content.role = config.config_default_role
content.sidebar_view = config.config_default_show
content.allowed_tags = config.config_allowed_tags
content.denied_tags = config.config_denied_tags
content.allowed_column_value = config.config_allowed_column_value
content.denied_column_value = config.config_denied_column_value
ub.session.add(content)
try:
ub.session.commit()
imported = 1
except Exception as e:
log.warning("Failed to create LDAP user: %s - %s", user, e)
ub.session.rollback()
showtext = _(u'Failed to Create at Least One LDAP User')
return imported, showtext
@admi.route('/import_ldap_users') @admi.route('/import_ldap_users')
@login_required @login_required
@admin_required @admin_required
@ -1349,47 +1417,11 @@ def import_ldap_users():
log.debug_or_exception(e) log.debug_or_exception(e)
continue continue
if user_data: if user_data:
user_login_field = extract_dynamic_field_from_filter(user, config.config_ldap_user_object) success, txt = create_ldap_user(user, user_data, config)
# In case of error store text for showing it
username = user_data[user_login_field][0].decode('utf-8') if txt:
# check for duplicate username showtext['text'] = txt
if ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == username.lower()).first(): imported += success
# if ub.session.query(ub.User).filter(ub.User.nickname == username).first():
log.warning("LDAP User %s Already in Database", user_data)
continue
kindlemail = ''
if 'mail' in user_data:
useremail = user_data['mail'][0].decode('utf-8')
if len(user_data['mail']) > 1:
kindlemail = user_data['mail'][1].decode('utf-8')
else:
log.debug('No Mail Field Found in LDAP Response')
useremail = username + '@email.com'
# check for duplicate email
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first():
log.warning("LDAP Email %s Already in Database", user_data)
continue
content = ub.User()
content.nickname = username
content.password = '' # dummy password which will be replaced by ldap one
content.email = useremail
content.kindle_mail = kindlemail
content.role = config.config_default_role
content.sidebar_view = config.config_default_show
content.allowed_tags = config.config_allowed_tags
content.denied_tags = config.config_denied_tags
content.allowed_column_value = config.config_allowed_column_value
content.denied_column_value = config.config_denied_column_value
ub.session.add(content)
try:
ub.session.commit()
imported += 1
except Exception as e:
log.warning("Failed to create LDAP user: %s - %s", user, e)
ub.session.rollback()
showtext['text'] = _(u'Failed to Create at Least One LDAP User')
else: else:
log.warning("LDAP User: %s Not Found", user) log.warning("LDAP User: %s Not Found", user)
showtext['text'] = _(u'At Least One LDAP User Not Found in Database') showtext['text'] = _(u'At Least One LDAP User Not Found in Database')