From 06d1274562e941b9a8a033578b02698f4397498f Mon Sep 17 00:00:00 2001 From: Ben Busby Date: Thu, 17 Dec 2020 14:41:43 -0500 Subject: [PATCH] PEP-8: Fix formatting for request class and routes --- app/request.py | 77 ++++++++++++++-------- app/routes.py | 165 ++++++++++++++++++++++++++++++++--------------- requirements.txt | 1 + 3 files changed, 164 insertions(+), 79 deletions(-) diff --git a/app/request.py b/app/request.py index 04ae3db..2513ed3 100644 --- a/app/request.py +++ b/app/request.py @@ -8,9 +8,9 @@ import os from stem import Signal, SocketError from stem.control import Controller -# Core Google search URLs SEARCH_URL = 'https://www.google.com/search?gbv=1&q=' -AUTOCOMPLETE_URL = 'https://suggestqueries.google.com/complete/search?client=toolbar&' +AUTOCOMPLETE_URL = ('https://suggestqueries.google.com/' + 'complete/search?client=toolbar&') MOBILE_UA = '{}/5.0 (Android 0; Mobile; rv:54.0) Gecko/54.0 {}/59.0' DESKTOP_UA = '{}/5.0 (X11; {} x86_64; rv:75.0) Gecko/20100101 {}/75.0' @@ -72,11 +72,16 @@ def gen_query(query, args, config, near_city=None) -> str: result_tbs = args.get('tbs') param_dict['tbs'] = '&tbs=' + result_tbs - # Occasionally the 'tbs' param provided by google also contains a field for 'lr', but formatted - # strangely. This is a (admittedly not very elegant) solution for this. - # Ex/ &tbs=qdr:h,lr:lang_1pl --> the lr param needs to be extracted and have the "1" digit removed in this case + # Occasionally the 'tbs' param provided by google also contains a + # field for 'lr', but formatted strangely. This is a rough solution + # for this. + # + # Example: + # &tbs=qdr:h,lr:lang_1pl + # -- the lr param needs to be extracted and remove the leading '1' sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _] - sub_lang = sub_lang[0][sub_lang[0].find('lr:') + 3:len(sub_lang[0])] if len(sub_lang) > 0 else '' + sub_lang = sub_lang[0][sub_lang[0].find('lr:') + + 3:len(sub_lang[0])] if len(sub_lang) > 0 else '' # Ensure search query is parsable query = urlparse.quote(query) @@ -93,20 +98,26 @@ def gen_query(query, args, config, near_city=None) -> str: if near_city: param_dict['near'] = '&near=' + urlparse.quote(near_city) - # Set language for results (lr) if source isn't set, otherwise use the result - # language param provided by google (but with the strange digit(s) removed) + # Set language for results (lr) if source isn't set, otherwise use the + # result language param provided in the results if 'source' in args: param_dict['source'] = '&source=' + args.get('source') - param_dict['lr'] = ('&lr=' + ''.join([_ for _ in sub_lang if not _.isdigit()])) if sub_lang else '' + param_dict['lr'] = ('&lr=' + ''.join( + [_ for _ in sub_lang if not _.isdigit()] + )) if sub_lang else '' else: - param_dict['lr'] = ('&lr=' + config.lang_search) if config.lang_search else '' + param_dict['lr'] = ( + '&lr=' + config.lang_search + ) if config.lang_search else '' - # Set autocorrected search ignore + # 'nfpr' defines the exclusion of results from an auto-corrected query if 'nfpr' in args: param_dict['nfpr'] = '&nfpr=' + args.get('nfpr') param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else '' - param_dict['hl'] = ('&hl=' + config.lang_interface.replace('lang_', '')) if config.lang_interface else '' + param_dict['hl'] = ( + '&hl=' + config.lang_interface.replace('lang_', '') + ) if config.lang_interface else '' param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off') for val in param_dict.values(): @@ -126,6 +137,7 @@ class Request: root_path -- the root path of the whoogle instance config -- the user's current whoogle configuration """ + def __init__(self, normal_ua, root_path, config: Config): # Send heartbeat to Tor, used in determining if the user can or cannot # enable Tor for future requests @@ -143,9 +155,10 @@ class Request: ':' + os.environ.get('WHOOGLE_PROXY_PASS') self.proxies = { 'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' + - auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), + auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), } - self.proxies['https'] = self.proxies['http'].replace('http', 'https') + self.proxies['https'] = self.proxies['http'].replace('http', + 'https') else: self.proxies = { 'http': 'socks5://127.0.0.1:9050', @@ -169,7 +182,8 @@ class Request: """ ac_query = dict(hl=self.language, q=query) - response = self.send(base_url=AUTOCOMPLETE_URL, query=urlparse.urlencode(ac_query)).text + response = self.send(base_url=AUTOCOMPLETE_URL, + query=urlparse.urlencode(ac_query)).text if response: dom = etree.fromstring(response) @@ -178,14 +192,14 @@ class Request: return [] def send(self, base_url=SEARCH_URL, query='', attempt=0) -> Response: - """Sends an outbound request to a URL. Optionally sends the request using Tor, if - enabled by the user. + """Sends an outbound request to a URL. Optionally sends the request + using Tor, if enabled by the user. Args: base_url: The URL to use in the request query: The optional query string for the request - attempt: The number of attempts made for the request (used for cycling - through Tor identities, if enabled) + attempt: The number of attempts made for the request + (used for cycling through Tor identities, if enabled) Returns: Response: The Response object returned by the requests call @@ -195,21 +209,30 @@ class Request: 'User-Agent': self.modified_user_agent } - # Validate Tor connection and request new identity if the last one failed - if self.tor and not send_tor_signal(Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT): - raise TorError("Tor was previously enabled, but the connection has been dropped. Please check your " + - "Tor configuration and try again.", disable=True) + # Validate Tor conn and request new identity if the last one failed + if self.tor and not send_tor_signal( + Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT): + raise TorError( + "Tor was previously enabled, but the connection has been " + "dropped. Please check your Tor configuration and try again.", + disable=True) # Make sure that the tor connection is valid, if enabled if self.tor: - tor_check = requests.get('https://check.torproject.org/', proxies=self.proxies, headers=headers) + tor_check = requests.get('https://check.torproject.org/', + proxies=self.proxies, headers=headers) self.tor_valid = 'Congratulations' in tor_check.text if not self.tor_valid: - raise TorError("Tor connection succeeded, but the connection could not be validated by torproject.org", - disable=True) + raise TorError( + "Tor connection succeeded, but the connection could not be " + "validated by torproject.org", + disable=True) - response = requests.get(base_url + query, proxies=self.proxies, headers=headers) + response = requests.get( + base_url + query, + proxies=self.proxies, + headers=headers) # Retry query with new identity if using Tor (max 10 attempts) if 'form id="captcha-form"' in response.text and self.tor: diff --git a/app/routes.py b/app/routes.py index dba4b54..ecf4910 100644 --- a/app/routes.py +++ b/app/routes.py @@ -9,7 +9,8 @@ import uuid from functools import wraps import waitress -from flask import jsonify, make_response, request, redirect, render_template, send_file, session, url_for +from flask import jsonify, make_response, request, redirect, render_template, \ + send_file, session, url_for from requests import exceptions from app import app @@ -30,11 +31,15 @@ def auth_required(f): # Skip if username/password not set whoogle_user = os.getenv('WHOOGLE_USER', '') whoogle_pass = os.getenv('WHOOGLE_PASS', '') - if (not whoogle_user or not whoogle_pass) or \ - (auth and whoogle_user == auth.username and whoogle_pass == auth.password): + if (not whoogle_user or not whoogle_pass) or ( + auth + and whoogle_user == auth.username + and whoogle_pass == auth.password): return f(*args, **kwargs) else: - return make_response('Not logged in', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}) + return make_response('Not logged in', 401, { + 'WWW-Authenticate': 'Basic realm="Login Required"'}) + return decorated @@ -46,7 +51,8 @@ def before_request_func(): # Generate session values for user if unavailable if not valid_user_session(session): session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \ - if os.path.exists(app.config['DEFAULT_CONFIG']) else {'url': request.url_root} + if os.path.exists(app.config['DEFAULT_CONFIG']) else { + 'url': request.url_root} session['uuid'] = str(uuid.uuid4()) session['fernet_keys'] = generate_user_keys(True) @@ -68,7 +74,9 @@ def before_request_func(): g.user_config = Config(**session['config']) if not g.user_config.url: - g.user_config.url = request.url_root.replace('http://', 'https://') if https_only else request.url_root + g.user_config.url = request.url_root.replace( + 'http://', + 'https://') if https_only else request.url_root g.user_request = Request( request.headers.get('User-Agent'), @@ -82,13 +90,17 @@ def before_request_func(): def after_request_func(response): if app.user_elements[session['uuid']] <= 0 and '/element' in request.url: # Regenerate element key if all elements have been served to user - session['fernet_keys']['element_key'] = '' if not g.cookies_disabled else app.default_key_set['element_key'] + session['fernet_keys'][ + 'element_key'] = '' if not g.cookies_disabled else \ + app.default_key_set['element_key'] app.user_elements[session['uuid']] = 0 - # Check if address consistently has cookies blocked, in which case start removing session - # files after creation. - # Note: This is primarily done to prevent overpopulation of session directories, since browsers that - # block cookies will still trigger Flask's session creation routine with every request. + # Check if address consistently has cookies blocked, + # in which case start removing session files after creation. + # + # Note: This is primarily done to prevent overpopulation of session + # directories, since browsers that block cookies will still trigger + # Flask's session creation routine with every request. if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips: app.no_cookie_ips.append(request.remote_addr) elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips: @@ -101,6 +113,7 @@ def after_request_func(response): @app.errorhandler(404) def unknown_page(e): + app.logger.warn(e) return redirect(g.app_location) @@ -109,7 +122,8 @@ def unknown_page(e): def index(): # Reset keys session['fernet_keys'] = generate_user_keys(g.cookies_disabled) - error_message = session['error_message'] if 'error_message' in session else '' + error_message = session[ + 'error_message'] if 'error_message' in session else '' session['error_message'] = '' return render_template('index.html', @@ -128,7 +142,8 @@ def opensearch(): if opensearch_url.endswith('/'): opensearch_url = opensearch_url[:-1] - get_only = g.user_config.get_only or 'Chrome' in request.headers.get('User-Agent') + get_only = g.user_config.get_only or 'Chrome' in request.headers.get( + 'User-Agent') return render_template( 'opensearch.xml', @@ -147,16 +162,23 @@ def autocomplete(): # Search bangs if the query begins with "!", but not "! " (feeling lucky) if q.startswith('!') and len(q) > 1 and not q.startswith('! '): - return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if _.startswith(q)]]) + return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if + _.startswith(q)]]) if not q and not request.data: return jsonify({'?': []}) elif request.data: - q = urlparse.unquote_plus(request.data.decode('utf-8').replace('q=', '')) + q = urlparse.unquote_plus( + request.data.decode('utf-8').replace('q=', '')) # Return a list of suggestions for the query - # Note: If Tor is enabled, this returns nothing, as the request is almost always rejected - return jsonify([q, g.user_request.autocomplete(q) if not g.user_config.tor else []]) + # + # Note: If Tor is enabled, this returns nothing, as the request is + # almost always rejected + return jsonify([ + q, + g.user_request.autocomplete(q) if not g.user_config.tor else [] + ]) @app.route('/search', methods=['GET', 'POST']) @@ -168,7 +190,8 @@ def search(): # Update user config if specified in search args g.user_config = g.user_config.from_params(g.request_params) - search_util = RoutingUtils(request, g.user_config, session, cookies_disabled=g.cookies_disabled) + search_util = RoutingUtils(request, g.user_config, session, + cookies_disabled=g.cookies_disabled) query = search_util.new_search_query() resolved_bangs = search_util.bang_operator(bang_json) @@ -183,14 +206,17 @@ def search(): try: response, elements = search_util.generate_response() except TorError as e: - session['error_message'] = e.message + ("\\n\\nTor config is now disabled!" if e.disable else "") - session['config']['tor'] = False if e.disable else session['config']['tor'] + session['error_message'] = e.message + ( + "\\n\\nTor config is now disabled!" if e.disable else "") + session['config']['tor'] = False if e.disable else session['config'][ + 'tor'] return redirect(url_for('.index')) if search_util.feeling_lucky or elements < 0: return redirect(response, code=303) - # Keep count of external elements to fetch before element key can be regenerated + # Keep count of external elements to fetch before + # the element key can be regenerated app.user_elements[session['uuid']] = elements return render_template( @@ -200,12 +226,13 @@ def search(): dark_mode=g.user_config.dark, response=response, version_number=app.config['VERSION_NUMBER'], - search_header=render_template( + search_header=(render_template( 'header.html', dark_mode=g.user_config.dark, query=urlparse.unquote(query), search_type=search_util.search_type, - mobile=g.user_request.mobile) if 'isch' not in search_util.search_type else '') + mobile=g.user_request.mobile) + if 'isch' not in search_util.search_type else '')) @app.route('/config', methods=['GET', 'POST', 'PUT']) @@ -215,8 +242,12 @@ def config(): return json.dumps(g.user_config.__dict__) elif request.method == 'PUT': if 'name' in request.args: - config_pkl = os.path.join(app.config['CONFIG_PATH'], request.args.get('name')) - session['config'] = pickle.load(open(config_pkl, 'rb')) if os.path.exists(config_pkl) else session['config'] + config_pkl = os.path.join( + app.config['CONFIG_PATH'], + request.args.get('name')) + session['config'] = (pickle.load(open(config_pkl, 'rb')) + if os.path.exists(config_pkl) + else session['config']) return json.dumps(session['config']) else: return json.dumps({}) @@ -227,11 +258,16 @@ def config(): # Save config by name to allow a user to easily load later if 'name' in request.args: - pickle.dump(config_data, open(os.path.join(app.config['CONFIG_PATH'], request.args.get('name')), 'wb')) + pickle.dump( + config_data, + open(os.path.join( + app.config['CONFIG_PATH'], + request.args.get('name')), 'wb')) # Overwrite default config if user has cookies disabled if g.cookies_disabled: - open(app.config['DEFAULT_CONFIG'], 'w').write(json.dumps(config_data, indent=4)) + open(app.config['DEFAULT_CONFIG'], 'w').write( + json.dumps(config_data, indent=4)) session['config'] = config_data return redirect(config_data['url']) @@ -274,7 +310,8 @@ def element(): except exceptions.RequestException: pass - empty_gif = base64.b64decode('R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==') + empty_gif = base64.b64decode( + 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==') return send_file(io.BytesIO(empty_gif), mimetype='image/gif') @@ -282,38 +319,62 @@ def element(): @auth_required def window(): get_body = g.user_request.send(base_url=request.args.get('location')).text - get_body = get_body.replace('src="/', 'src="' + request.args.get('location') + '"') - get_body = get_body.replace('href="/', 'href="' + request.args.get('location') + '"') + get_body = get_body.replace('src="/', + 'src="' + request.args.get('location') + '"') + get_body = get_body.replace('href="/', + 'href="' + request.args.get('location') + '"') results = BeautifulSoup(get_body, 'html.parser') - try: - for script in results('script'): - script.decompose() - except Exception: - pass + for script in results('script'): + script.decompose() return render_template('display.html', response=results) def run_app(): - parser = argparse.ArgumentParser(description='Whoogle Search console runner') - parser.add_argument('--port', default=5000, metavar='', - help='Specifies a port to run on (default 5000)') - parser.add_argument('--host', default='127.0.0.1', metavar='', - help='Specifies the host address to use (default 127.0.0.1)') - parser.add_argument('--debug', default=False, action='store_true', - help='Activates debug mode for the server (default False)') - parser.add_argument('--https-only', default=False, action='store_true', - help='Enforces HTTPS redirects for all requests') - parser.add_argument('--userpass', default='', metavar='', - help='Sets a username/password basic auth combo (default None)') - parser.add_argument('--proxyauth', default='', metavar='', - help='Sets a username/password for a HTTP/SOCKS proxy (default None)') - parser.add_argument('--proxytype', default='', metavar='', - help='Sets a proxy type for all connections (default None)') - parser.add_argument('--proxyloc', default='', metavar='', - help='Sets a proxy location for all connections (default None)') + parser = argparse.ArgumentParser( + description='Whoogle Search console runner') + parser.add_argument( + '--port', + default=5000, + metavar='', + help='Specifies a port to run on (default 5000)') + parser.add_argument( + '--host', + default='127.0.0.1', + metavar='', + help='Specifies the host address to use (default 127.0.0.1)') + parser.add_argument( + '--debug', + default=False, + action='store_true', + help='Activates debug mode for the server (default False)') + parser.add_argument( + '--https-only', + default=False, + action='store_true', + help='Enforces HTTPS redirects for all requests') + parser.add_argument( + '--userpass', + default='', + metavar='', + help='Sets a username/password basic auth combo (default None)') + parser.add_argument( + '--proxyauth', + default='', + metavar='', + help='Sets a username/password for a HTTP/SOCKS proxy (default None)') + parser.add_argument( + '--proxytype', + default='', + metavar='', + help='Sets a proxy type for all connections (default None)') + parser.add_argument( + '--proxyloc', + default='', + metavar='', + help='Sets a proxy location for all connections (default None)') args = parser.parse_args() if args.userpass: diff --git a/requirements.txt b/requirements.txt index 508cbcf..7dfa434 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ lxml==4.5.1 MarkupSafe==1.1.1 more-itertools==8.3.0 packaging==20.4 +pep8==1.7.1 pluggy==0.13.1 py==1.8.1 pycparser==2.19