PEP-8: Fix formatting for request class and routes

This commit is contained in:
Ben Busby 2020-12-17 14:41:43 -05:00
parent b695179c79
commit 06d1274562
No known key found for this signature in database
GPG Key ID: 3B08611DF6E62ED2
3 changed files with 164 additions and 79 deletions

View File

@ -8,9 +8,9 @@ import os
from stem import Signal, SocketError
from stem.control import Controller
# Core Google search URLs
SEARCH_URL = 'https://www.google.com/search?gbv=1&q='
AUTOCOMPLETE_URL = 'https://suggestqueries.google.com/complete/search?client=toolbar&'
AUTOCOMPLETE_URL = ('https://suggestqueries.google.com/'
'complete/search?client=toolbar&')
MOBILE_UA = '{}/5.0 (Android 0; Mobile; rv:54.0) Gecko/54.0 {}/59.0'
DESKTOP_UA = '{}/5.0 (X11; {} x86_64; rv:75.0) Gecko/20100101 {}/75.0'
@ -72,11 +72,16 @@ def gen_query(query, args, config, near_city=None) -> str:
result_tbs = args.get('tbs')
param_dict['tbs'] = '&tbs=' + result_tbs
# Occasionally the 'tbs' param provided by google also contains a field for 'lr', but formatted
# strangely. This is a (admittedly not very elegant) solution for this.
# Ex/ &tbs=qdr:h,lr:lang_1pl --> the lr param needs to be extracted and have the "1" digit removed in this case
# Occasionally the 'tbs' param provided by google also contains a
# field for 'lr', but formatted strangely. This is a rough solution
# for this.
#
# Example:
# &tbs=qdr:h,lr:lang_1pl
# -- the lr param needs to be extracted and remove the leading '1'
sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _]
sub_lang = sub_lang[0][sub_lang[0].find('lr:') + 3:len(sub_lang[0])] if len(sub_lang) > 0 else ''
sub_lang = sub_lang[0][sub_lang[0].find('lr:') +
3:len(sub_lang[0])] if len(sub_lang) > 0 else ''
# Ensure search query is parsable
query = urlparse.quote(query)
@ -93,20 +98,26 @@ def gen_query(query, args, config, near_city=None) -> str:
if near_city:
param_dict['near'] = '&near=' + urlparse.quote(near_city)
# Set language for results (lr) if source isn't set, otherwise use the result
# language param provided by google (but with the strange digit(s) removed)
# Set language for results (lr) if source isn't set, otherwise use the
# result language param provided in the results
if 'source' in args:
param_dict['source'] = '&source=' + args.get('source')
param_dict['lr'] = ('&lr=' + ''.join([_ for _ in sub_lang if not _.isdigit()])) if sub_lang else ''
param_dict['lr'] = ('&lr=' + ''.join(
[_ for _ in sub_lang if not _.isdigit()]
)) if sub_lang else ''
else:
param_dict['lr'] = ('&lr=' + config.lang_search) if config.lang_search else ''
param_dict['lr'] = (
'&lr=' + config.lang_search
) if config.lang_search else ''
# Set autocorrected search ignore
# 'nfpr' defines the exclusion of results from an auto-corrected query
if 'nfpr' in args:
param_dict['nfpr'] = '&nfpr=' + args.get('nfpr')
param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else ''
param_dict['hl'] = ('&hl=' + config.lang_interface.replace('lang_', '')) if config.lang_interface else ''
param_dict['hl'] = (
'&hl=' + config.lang_interface.replace('lang_', '')
) if config.lang_interface else ''
param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off')
for val in param_dict.values():
@ -126,6 +137,7 @@ class Request:
root_path -- the root path of the whoogle instance
config -- the user's current whoogle configuration
"""
def __init__(self, normal_ua, root_path, config: Config):
# Send heartbeat to Tor, used in determining if the user can or cannot
# enable Tor for future requests
@ -143,9 +155,10 @@ class Request:
':' + os.environ.get('WHOOGLE_PROXY_PASS')
self.proxies = {
'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' +
auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'),
auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'),
}
self.proxies['https'] = self.proxies['http'].replace('http', 'https')
self.proxies['https'] = self.proxies['http'].replace('http',
'https')
else:
self.proxies = {
'http': 'socks5://127.0.0.1:9050',
@ -169,7 +182,8 @@ class Request:
"""
ac_query = dict(hl=self.language, q=query)
response = self.send(base_url=AUTOCOMPLETE_URL, query=urlparse.urlencode(ac_query)).text
response = self.send(base_url=AUTOCOMPLETE_URL,
query=urlparse.urlencode(ac_query)).text
if response:
dom = etree.fromstring(response)
@ -178,14 +192,14 @@ class Request:
return []
def send(self, base_url=SEARCH_URL, query='', attempt=0) -> Response:
"""Sends an outbound request to a URL. Optionally sends the request using Tor, if
enabled by the user.
"""Sends an outbound request to a URL. Optionally sends the request
using Tor, if enabled by the user.
Args:
base_url: The URL to use in the request
query: The optional query string for the request
attempt: The number of attempts made for the request (used for cycling
through Tor identities, if enabled)
attempt: The number of attempts made for the request
(used for cycling through Tor identities, if enabled)
Returns:
Response: The Response object returned by the requests call
@ -195,21 +209,30 @@ class Request:
'User-Agent': self.modified_user_agent
}
# Validate Tor connection and request new identity if the last one failed
if self.tor and not send_tor_signal(Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT):
raise TorError("Tor was previously enabled, but the connection has been dropped. Please check your " +
"Tor configuration and try again.", disable=True)
# Validate Tor conn and request new identity if the last one failed
if self.tor and not send_tor_signal(
Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT):
raise TorError(
"Tor was previously enabled, but the connection has been "
"dropped. Please check your Tor configuration and try again.",
disable=True)
# Make sure that the tor connection is valid, if enabled
if self.tor:
tor_check = requests.get('https://check.torproject.org/', proxies=self.proxies, headers=headers)
tor_check = requests.get('https://check.torproject.org/',
proxies=self.proxies, headers=headers)
self.tor_valid = 'Congratulations' in tor_check.text
if not self.tor_valid:
raise TorError("Tor connection succeeded, but the connection could not be validated by torproject.org",
disable=True)
raise TorError(
"Tor connection succeeded, but the connection could not be "
"validated by torproject.org",
disable=True)
response = requests.get(base_url + query, proxies=self.proxies, headers=headers)
response = requests.get(
base_url + query,
proxies=self.proxies,
headers=headers)
# Retry query with new identity if using Tor (max 10 attempts)
if 'form id="captcha-form"' in response.text and self.tor:

View File

@ -9,7 +9,8 @@ import uuid
from functools import wraps
import waitress
from flask import jsonify, make_response, request, redirect, render_template, send_file, session, url_for
from flask import jsonify, make_response, request, redirect, render_template, \
send_file, session, url_for
from requests import exceptions
from app import app
@ -30,11 +31,15 @@ def auth_required(f):
# Skip if username/password not set
whoogle_user = os.getenv('WHOOGLE_USER', '')
whoogle_pass = os.getenv('WHOOGLE_PASS', '')
if (not whoogle_user or not whoogle_pass) or \
(auth and whoogle_user == auth.username and whoogle_pass == auth.password):
if (not whoogle_user or not whoogle_pass) or (
auth
and whoogle_user == auth.username
and whoogle_pass == auth.password):
return f(*args, **kwargs)
else:
return make_response('Not logged in', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
return make_response('Not logged in', 401, {
'WWW-Authenticate': 'Basic realm="Login Required"'})
return decorated
@ -46,7 +51,8 @@ def before_request_func():
# Generate session values for user if unavailable
if not valid_user_session(session):
session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \
if os.path.exists(app.config['DEFAULT_CONFIG']) else {'url': request.url_root}
if os.path.exists(app.config['DEFAULT_CONFIG']) else {
'url': request.url_root}
session['uuid'] = str(uuid.uuid4())
session['fernet_keys'] = generate_user_keys(True)
@ -68,7 +74,9 @@ def before_request_func():
g.user_config = Config(**session['config'])
if not g.user_config.url:
g.user_config.url = request.url_root.replace('http://', 'https://') if https_only else request.url_root
g.user_config.url = request.url_root.replace(
'http://',
'https://') if https_only else request.url_root
g.user_request = Request(
request.headers.get('User-Agent'),
@ -82,13 +90,17 @@ def before_request_func():
def after_request_func(response):
if app.user_elements[session['uuid']] <= 0 and '/element' in request.url:
# Regenerate element key if all elements have been served to user
session['fernet_keys']['element_key'] = '' if not g.cookies_disabled else app.default_key_set['element_key']
session['fernet_keys'][
'element_key'] = '' if not g.cookies_disabled else \
app.default_key_set['element_key']
app.user_elements[session['uuid']] = 0
# Check if address consistently has cookies blocked, in which case start removing session
# files after creation.
# Note: This is primarily done to prevent overpopulation of session directories, since browsers that
# block cookies will still trigger Flask's session creation routine with every request.
# Check if address consistently has cookies blocked,
# in which case start removing session files after creation.
#
# Note: This is primarily done to prevent overpopulation of session
# directories, since browsers that block cookies will still trigger
# Flask's session creation routine with every request.
if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips:
app.no_cookie_ips.append(request.remote_addr)
elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips:
@ -101,6 +113,7 @@ def after_request_func(response):
@app.errorhandler(404)
def unknown_page(e):
app.logger.warn(e)
return redirect(g.app_location)
@ -109,7 +122,8 @@ def unknown_page(e):
def index():
# Reset keys
session['fernet_keys'] = generate_user_keys(g.cookies_disabled)
error_message = session['error_message'] if 'error_message' in session else ''
error_message = session[
'error_message'] if 'error_message' in session else ''
session['error_message'] = ''
return render_template('index.html',
@ -128,7 +142,8 @@ def opensearch():
if opensearch_url.endswith('/'):
opensearch_url = opensearch_url[:-1]
get_only = g.user_config.get_only or 'Chrome' in request.headers.get('User-Agent')
get_only = g.user_config.get_only or 'Chrome' in request.headers.get(
'User-Agent')
return render_template(
'opensearch.xml',
@ -147,16 +162,23 @@ def autocomplete():
# Search bangs if the query begins with "!", but not "! " (feeling lucky)
if q.startswith('!') and len(q) > 1 and not q.startswith('! '):
return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if _.startswith(q)]])
return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if
_.startswith(q)]])
if not q and not request.data:
return jsonify({'?': []})
elif request.data:
q = urlparse.unquote_plus(request.data.decode('utf-8').replace('q=', ''))
q = urlparse.unquote_plus(
request.data.decode('utf-8').replace('q=', ''))
# Return a list of suggestions for the query
# Note: If Tor is enabled, this returns nothing, as the request is almost always rejected
return jsonify([q, g.user_request.autocomplete(q) if not g.user_config.tor else []])
#
# Note: If Tor is enabled, this returns nothing, as the request is
# almost always rejected
return jsonify([
q,
g.user_request.autocomplete(q) if not g.user_config.tor else []
])
@app.route('/search', methods=['GET', 'POST'])
@ -168,7 +190,8 @@ def search():
# Update user config if specified in search args
g.user_config = g.user_config.from_params(g.request_params)
search_util = RoutingUtils(request, g.user_config, session, cookies_disabled=g.cookies_disabled)
search_util = RoutingUtils(request, g.user_config, session,
cookies_disabled=g.cookies_disabled)
query = search_util.new_search_query()
resolved_bangs = search_util.bang_operator(bang_json)
@ -183,14 +206,17 @@ def search():
try:
response, elements = search_util.generate_response()
except TorError as e:
session['error_message'] = e.message + ("\\n\\nTor config is now disabled!" if e.disable else "")
session['config']['tor'] = False if e.disable else session['config']['tor']
session['error_message'] = e.message + (
"\\n\\nTor config is now disabled!" if e.disable else "")
session['config']['tor'] = False if e.disable else session['config'][
'tor']
return redirect(url_for('.index'))
if search_util.feeling_lucky or elements < 0:
return redirect(response, code=303)
# Keep count of external elements to fetch before element key can be regenerated
# Keep count of external elements to fetch before
# the element key can be regenerated
app.user_elements[session['uuid']] = elements
return render_template(
@ -200,12 +226,13 @@ def search():
dark_mode=g.user_config.dark,
response=response,
version_number=app.config['VERSION_NUMBER'],
search_header=render_template(
search_header=(render_template(
'header.html',
dark_mode=g.user_config.dark,
query=urlparse.unquote(query),
search_type=search_util.search_type,
mobile=g.user_request.mobile) if 'isch' not in search_util.search_type else '')
mobile=g.user_request.mobile)
if 'isch' not in search_util.search_type else ''))
@app.route('/config', methods=['GET', 'POST', 'PUT'])
@ -215,8 +242,12 @@ def config():
return json.dumps(g.user_config.__dict__)
elif request.method == 'PUT':
if 'name' in request.args:
config_pkl = os.path.join(app.config['CONFIG_PATH'], request.args.get('name'))
session['config'] = pickle.load(open(config_pkl, 'rb')) if os.path.exists(config_pkl) else session['config']
config_pkl = os.path.join(
app.config['CONFIG_PATH'],
request.args.get('name'))
session['config'] = (pickle.load(open(config_pkl, 'rb'))
if os.path.exists(config_pkl)
else session['config'])
return json.dumps(session['config'])
else:
return json.dumps({})
@ -227,11 +258,16 @@ def config():
# Save config by name to allow a user to easily load later
if 'name' in request.args:
pickle.dump(config_data, open(os.path.join(app.config['CONFIG_PATH'], request.args.get('name')), 'wb'))
pickle.dump(
config_data,
open(os.path.join(
app.config['CONFIG_PATH'],
request.args.get('name')), 'wb'))
# Overwrite default config if user has cookies disabled
if g.cookies_disabled:
open(app.config['DEFAULT_CONFIG'], 'w').write(json.dumps(config_data, indent=4))
open(app.config['DEFAULT_CONFIG'], 'w').write(
json.dumps(config_data, indent=4))
session['config'] = config_data
return redirect(config_data['url'])
@ -274,7 +310,8 @@ def element():
except exceptions.RequestException:
pass
empty_gif = base64.b64decode('R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==')
empty_gif = base64.b64decode(
'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==')
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')
@ -282,38 +319,62 @@ def element():
@auth_required
def window():
get_body = g.user_request.send(base_url=request.args.get('location')).text
get_body = get_body.replace('src="/', 'src="' + request.args.get('location') + '"')
get_body = get_body.replace('href="/', 'href="' + request.args.get('location') + '"')
get_body = get_body.replace('src="/',
'src="' + request.args.get('location') + '"')
get_body = get_body.replace('href="/',
'href="' + request.args.get('location') + '"')
results = BeautifulSoup(get_body, 'html.parser')
try:
for script in results('script'):
script.decompose()
except Exception:
pass
for script in results('script'):
script.decompose()
return render_template('display.html', response=results)
def run_app():
parser = argparse.ArgumentParser(description='Whoogle Search console runner')
parser.add_argument('--port', default=5000, metavar='<port number>',
help='Specifies a port to run on (default 5000)')
parser.add_argument('--host', default='127.0.0.1', metavar='<ip address>',
help='Specifies the host address to use (default 127.0.0.1)')
parser.add_argument('--debug', default=False, action='store_true',
help='Activates debug mode for the server (default False)')
parser.add_argument('--https-only', default=False, action='store_true',
help='Enforces HTTPS redirects for all requests')
parser.add_argument('--userpass', default='', metavar='<username:password>',
help='Sets a username/password basic auth combo (default None)')
parser.add_argument('--proxyauth', default='', metavar='<username:password>',
help='Sets a username/password for a HTTP/SOCKS proxy (default None)')
parser.add_argument('--proxytype', default='', metavar='<socks4|socks5|http>',
help='Sets a proxy type for all connections (default None)')
parser.add_argument('--proxyloc', default='', metavar='<location:port>',
help='Sets a proxy location for all connections (default None)')
parser = argparse.ArgumentParser(
description='Whoogle Search console runner')
parser.add_argument(
'--port',
default=5000,
metavar='<port number>',
help='Specifies a port to run on (default 5000)')
parser.add_argument(
'--host',
default='127.0.0.1',
metavar='<ip address>',
help='Specifies the host address to use (default 127.0.0.1)')
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='Activates debug mode for the server (default False)')
parser.add_argument(
'--https-only',
default=False,
action='store_true',
help='Enforces HTTPS redirects for all requests')
parser.add_argument(
'--userpass',
default='',
metavar='<username:password>',
help='Sets a username/password basic auth combo (default None)')
parser.add_argument(
'--proxyauth',
default='',
metavar='<username:password>',
help='Sets a username/password for a HTTP/SOCKS proxy (default None)')
parser.add_argument(
'--proxytype',
default='',
metavar='<socks4|socks5|http>',
help='Sets a proxy type for all connections (default None)')
parser.add_argument(
'--proxyloc',
default='',
metavar='<location:port>',
help='Sets a proxy location for all connections (default None)')
args = parser.parse_args()
if args.userpass:

View File

@ -16,6 +16,7 @@ lxml==4.5.1
MarkupSafe==1.1.1
more-itertools==8.3.0
packaging==20.4
pep8==1.7.1
pluggy==0.13.1
py==1.8.1
pycparser==2.19