whoogle-search/app/routes.py
Ben Busby 9f435bf8fe Major refactor of requests and session management
- Switches from pycurl to requests library
  - Allows for less janky decoding, especially with non-latin character
  sets
- Adds session level management of user configs
  - Allows for each session to set its own config (people are probably
  going to complain about this, though not sure if it'll be the same
  number of people who are upset that their friends/family have to share
  their config)
- Updates key gen/regen to more aggressively swap out keys after each
request
2020-05-28 18:14:10 -06:00

251 lines
8.4 KiB
Python

from app import app
from app.models.config import Config
from app.request import Request
from app.utils.misc import generate_user_keys, valid_user_session
from app.utils.routing_utils import *
import argparse
import base64
from bs4 import BeautifulSoup
from cryptography.fernet import Fernet
from flask import g, jsonify, make_response, request, redirect, render_template, send_file, session
from functools import wraps
import io
import json
import os
import urllib.parse as urlparse
from requests import exceptions
import uuid
import waitress
def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
# Skip if username/password not set
whoogle_user = os.getenv('WHOOGLE_USER', '')
whoogle_pass = os.getenv('WHOOGLE_PASS', '')
if (not whoogle_user or not whoogle_pass) or \
(auth and whoogle_user == auth.username and whoogle_pass == auth.password):
return f(*args, **kwargs)
else:
return make_response('Not logged in', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
return decorated
@app.before_request
def before_request_func():
# Generate secret key for user if unavailable
if not valid_user_session(session):
session['config'] = {'url': request.url_root}
session['keys'] = generate_user_keys()
session['uuid'] = str(uuid.uuid4())
if session['uuid'] not in app.user_elements:
app.user_elements.update({session['uuid']: 0})
# Always redirect to https if HTTPS_ONLY is set (otherwise default to False)
https_only = os.getenv('HTTPS_ONLY', False)
if https_only and request.url.startswith('http://'):
return redirect(request.url.replace('http://', 'https://', 1), code=308)
g.user_config = Config(**session['config'])
if not g.user_config.url:
g.user_config.url = request.url_root.replace('http://', 'https://') if https_only else request.url_root
g.user_request = Request(request.headers.get('User-Agent'), language=g.user_config.lang)
g.app_location = g.user_config.url
@app.after_request
def after_request_func(response):
# Regenerate element key if all elements have been served to user
if app.user_elements[session['uuid']] <= 0 and '/element' in request.url:
session['keys']['element_key'] = Fernet.generate_key()
app.user_elements[session['uuid']] = 0
return response
@app.errorhandler(404)
def unknown_page(e):
return redirect(g.app_location)
@app.route('/', methods=['GET'])
@auth_required
def index():
return render_template('index.html',
ua=g.user_request.modified_user_agent,
languages=Config.LANGUAGES,
countries=Config.COUNTRIES,
config=g.user_config,
version_number=app.config['VERSION_NUMBER'])
@app.route('/opensearch.xml', methods=['GET'])
@auth_required
def opensearch():
opensearch_url = g.app_location
if opensearch_url.endswith('/'):
opensearch_url = opensearch_url[:-1]
template = render_template('opensearch.xml',
main_url=opensearch_url,
request_type='get' if g.user_config.get_only else 'post')
response = make_response(template)
response.headers['Content-Type'] = 'application/xml'
return response
@app.route('/autocomplete', methods=['GET', 'POST'])
def autocomplete():
request_params = request.args if request.method == 'GET' else request.form
q = request_params.get('q')
if not q and not request.data:
return jsonify({'?': []})
elif request.data:
q = urlparse.unquote_plus(request.data.decode('utf-8').replace('q=', ''))
return jsonify([q, g.user_request.autocomplete(q)])
@app.route('/search', methods=['GET', 'POST'])
@auth_required
def search():
# Clear previous elements and generate a new key each time a new search is performed
app.user_elements[session['uuid']] = 0
session['keys']['element_key'] = Fernet.generate_key()
search_util = RoutingUtils(request, g.user_config, session)
query = search_util.new_search_query()
# Redirect to home if invalid/blank search
if not query:
return redirect('/')
# Generate response and number of external elements from the page
response, elements = search_util.generate_response()
if search_util.feeling_lucky:
return redirect(response, code=303)
# Keep count of external elements to fetch before element key can be regenerated
app.user_elements[session['uuid']] = elements
return render_template(
'display.html',
query=urlparse.unquote(query),
search_type=search_util.search_type,
dark_mode=g.user_config.dark,
response=response,
search_header=render_template(
'header.html',
dark_mode=g.user_config.dark,
query=urlparse.unquote(query),
search_type=search_util.search_type,
mobile=g.user_request.mobile) if 'isch' not in search_util.search_type else '')
@app.route('/config', methods=['GET', 'POST'])
@auth_required
def config():
if request.method == 'GET':
return json.dumps(g.user_config.__dict__)
else:
config_data = request.form.to_dict()
if 'url' not in config_data or not config_data['url']:
config_data['url'] = g.user_config.url
session['config'] = config_data
return redirect(config_data['url'])
@app.route('/url', methods=['GET'])
@auth_required
def url():
if 'url' in request.args:
return redirect(request.args.get('url'))
q = request.args.get('q')
if len(q) > 0 and 'http' in q:
return redirect(q)
else:
return render_template('error.html', query=q)
@app.route('/imgres')
@auth_required
def imgres():
return redirect(request.args.get('imgurl'))
@app.route('/element')
@auth_required
def element():
cipher_suite = Fernet(session['keys']['element_key'])
src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode()
src_type = request.args.get('type')
try:
file_data = g.user_request.send(base_url=src_url, return_bytes=True)
app.user_elements[session['uuid']] -= 1
tmp_mem = io.BytesIO()
tmp_mem.write(file_data)
tmp_mem.seek(0)
return send_file(tmp_mem, mimetype=src_type)
except exceptions.RequestException:
pass
empty_gif = base64.b64decode('R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==')
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')
@app.route('/window')
@auth_required
def window():
get_body = g.user_request.send(base_url=request.args.get('location'))
get_body = get_body.replace('src="/', 'src="' + request.args.get('location') + '"')
get_body = get_body.replace('href="/', 'href="' + request.args.get('location') + '"')
results = BeautifulSoup(get_body, 'html.parser')
try:
for script in results('script'):
script.decompose()
except Exception:
pass
return render_template('display.html', response=results)
def run_app():
parser = argparse.ArgumentParser(description='Whoogle Search console runner')
parser.add_argument('--port', default=5000, metavar='<port number>',
help='Specifies a port to run on (default 5000)')
parser.add_argument('--host', default='127.0.0.1', metavar='<ip address>',
help='Specifies the host address to use (default 127.0.0.1)')
parser.add_argument('--debug', default=False, action='store_true',
help='Activates debug mode for the server (default False)')
parser.add_argument('--https-only', default=False, action='store_true',
help='Enforces HTTPS redirects for all requests')
parser.add_argument('--userpass', default='', metavar='<username:password>',
help='Sets a username/password basic auth combo (default None)')
args = parser.parse_args()
if args.userpass:
user_pass = args.userpass.split(':')
os.environ['WHOOGLE_USER'] = user_pass[0]
os.environ['WHOOGLE_PASS'] = user_pass[1]
os.environ['HTTPS_ONLY'] = '1' if args.https_only else ''
if args.debug:
app.run(host=args.host, port=args.port, debug=args.debug)
else:
waitress.serve(app, listen="{}:{}".format(args.host, args.port))