Improved limiter

This commit is contained in:
Ozzie Isaacs 2023-02-12 13:10:00 +01:00
parent f4ecfe4aca
commit ce48e06c45
4 changed files with 60 additions and 47 deletions

2
cps.py
View File

@ -21,7 +21,7 @@ import os
import sys
# Add local path to sys.path so we can import cps
# Add local path to sys.path, so we can import cps
path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, path)

View File

@ -166,6 +166,6 @@ def requires_kobo_auth(f):
login_user(user)
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
return f(*args, **kwargs)
log.debug("Received Kobo request without a recognizable auth token.")
return abort(401)
log.debug("Received Kobo request without a recognizable auth token.")
return abort(401)
return inner

View File

@ -22,7 +22,6 @@ import errno
import signal
import socket
import subprocess # nosec
from .services.background_scheduler import BackgroundScheduler
try:
from gevent.pywsgi import WSGIServer
@ -268,6 +267,7 @@ class WebServer(object):
@staticmethod
def shutdown_scheduler():
from .services.background_scheduler import BackgroundScheduler
scheduler = BackgroundScheduler()
if scheduler:
scheduler.scheduler.shutdown()

View File

@ -28,10 +28,10 @@ from flask import Blueprint, jsonify
from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for
from flask import session as flask_session
from flask_babel import gettext as _
from flask_babel import lazy_gettext as N_
from flask_babel import get_locale
from flask_login import login_user, logout_user, login_required, current_user
from flask_limiter import RateLimitExceeded
from flask_limiter.util import get_remote_address
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
from sqlalchemy.sql.expression import text, func, false, not_, and_, or_
from sqlalchemy.orm.attributes import flag_modified
@ -1223,8 +1223,62 @@ def send_to_ereader(book_id, book_format, convert):
# ################################### Login Logout ##################################################################
@web.route('/register', methods=['POST'])
@limiter.limit("40/day", key_func=get_remote_address)
@limiter.limit("3/minute", key_func=get_remote_address)
def register_post():
if not config.config_public_reg:
abort(404)
to_save = request.form.to_dict()
try:
limiter.check()
except RateLimitExceeded:
flash(_(u"Please wait one minute to register next user"), category="error")
return render_title_template('register.html', config=config, title=_("Register"), page="register")
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
nickname = to_save.get("email", "").strip() if config.config_register_email else to_save.get('name')
if not nickname or not to_save.get("email"):
flash(_("Oops! Please complete all fields."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
try:
nickname = check_username(nickname)
email = check_email(to_save.get("email", ""))
except Exception as ex:
flash(str(ex), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
@web.route('/register', methods=['GET', 'POST'])
content = ub.User()
if check_valid_domain(email):
content.name = nickname
content.email = email
password = generate_random_password()
content.password = generate_password_hash(password)
content.role = config.config_default_role
content.locale = config.config_default_locale
content.sidebar_view = config.config_default_show
try:
ub.session.add(content)
ub.session.commit()
if feature_support['oauth']:
register_user_with_oauth(content)
send_registration_mail(to_save.get("email", "").strip(), nickname, password)
except Exception:
ub.session.rollback()
flash(_("Oops! An unknown error occurred. Please try again later."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
else:
flash(_("Oops! Your Email is not allowed."), category="error")
log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email","")))
return render_title_template('register.html', title=_("Register"), page="register")
flash(_("Success! Confirmation Email has been sent."), category="success")
return redirect(url_for('web.login'))
@web.route('/register', methods=['GET'])
def register():
if not config.config_public_reg:
abort(404)
@ -1233,47 +1287,6 @@ def register():
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
if request.method == "POST":
to_save = request.form.to_dict()
nickname = to_save.get("email", "").strip() if config.config_register_email else to_save.get('name')
if not nickname or not to_save.get("email"):
flash(_("Oops! Please complete all fields."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
try:
nickname = check_username(nickname)
email = check_email(to_save.get("email", ""))
except Exception as ex:
flash(str(ex), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
content = ub.User()
if check_valid_domain(email):
content.name = nickname
content.email = email
password = generate_random_password()
content.password = generate_password_hash(password)
content.role = config.config_default_role
content.locale = config.config_default_locale
content.sidebar_view = config.config_default_show
try:
ub.session.add(content)
ub.session.commit()
if feature_support['oauth']:
register_user_with_oauth(content)
send_registration_mail(to_save.get("email", "").strip(), nickname, password)
except Exception:
ub.session.rollback()
flash(_("Oops! An unknown error occurred. Please try again later."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
else:
flash(_("Oops! Your Email is not allowed."), category="error")
log.warning('Registering failed for user "{}" Email: {}'.format(nickname,
to_save.get("email","")))
return render_title_template('register.html', title=_("Register"), page="register")
flash(_("Success! Confirmation Email has been sent."), category="success")
return redirect(url_for('web.login'))
if feature_support['oauth']:
register_user_with_oauth()
return render_title_template('register.html', config=config, title=_("Register"), page="register")