This introduces a new approach to handling user sessions, which should allow for users to set more reliable config settings on public instances. Previously, when a user with cookies disabled would update their config, this would modify the app's default config file, which would in turn cause new users to inherit these settings when visiting the app for the first time and cause users to inherit these settings when their current session cookie expired (which was after 30 days by default I believe). There was also some half-baked logic for determining on the backend whether or not a user had cookies disabled, which lead to some issues with out of control session file creation by Flask. Now, when a user visits the site, their initial request is forwarded to a session/<session id> endpoint, and during that subsequent request their current session id is matched against the one found in the url. If the ids match, the user has cookies enabled. If not, their original request is modified with a 'cookies_disabled' query param that tells Flask not to bother trying to set up a new session for that user, and instead just use the app's fallback Fernet key for encryption and the default config. Since attempting to create a session for a user with cookies disabled creates a new session file, there is now also a clean-up routine included in the new session decorator, which will remove all sessions that don't include a valid key in the dict. NOTE!!! This means that current user sessions on public instances will be cleared once this update is merged in. In the long run that's a good thing though, since this will allow session mgmt to be a lot more reliable overall for users regardless of their cookie preference. Individual user sessions still use a unique Fernet key for encrypting queries, but users with cookies disabled will use the default app key for encryption and decryption. Sessions are also now (semi)permanent and have a lifetime of 1 year.
173 lines
5.6 KiB
Python
173 lines
5.6 KiB
Python
import os
|
|
import re
|
|
from typing import Any
|
|
|
|
from app.filter import Filter, get_first_link
|
|
from app.request import gen_query
|
|
from bs4 import BeautifulSoup as bsoup
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
from flask import g
|
|
|
|
TOR_BANNER = '<hr><h1 style="text-align: center">You are using Tor</h1><hr>'
|
|
CAPTCHA = 'div class="g-recaptcha"'
|
|
|
|
|
|
def needs_https(url: str) -> bool:
|
|
"""Checks if the current instance needs to be upgraded to HTTPS
|
|
|
|
Note that all Heroku instances are available by default over HTTPS, but
|
|
do not automatically set up a redirect when visited over HTTP.
|
|
|
|
Args:
|
|
url: The instance url
|
|
|
|
Returns:
|
|
bool: True/False representing the need to upgrade
|
|
|
|
"""
|
|
https_only = bool(os.getenv('HTTPS_ONLY', 0))
|
|
is_heroku = url.endswith('.herokuapp.com')
|
|
is_http = url.startswith('http://')
|
|
|
|
return (is_heroku and is_http) or (https_only and is_http)
|
|
|
|
|
|
def has_captcha(results: str) -> bool:
|
|
"""Checks to see if the search results are blocked by a captcha
|
|
|
|
Args:
|
|
results: The search page html as a string
|
|
|
|
Returns:
|
|
bool: True/False indicating if a captcha element was found
|
|
|
|
"""
|
|
return CAPTCHA in results
|
|
|
|
|
|
class Search:
|
|
"""Search query preprocessor - used before submitting the query or
|
|
redirecting to another site
|
|
|
|
Attributes:
|
|
request: the incoming flask request
|
|
config: the current user config settings
|
|
session_key: the flask user fernet key
|
|
"""
|
|
def __init__(self, request, config, session_key, cookies_disabled=False):
|
|
method = request.method
|
|
self.request_params = request.args if method == 'GET' else request.form
|
|
self.user_agent = request.headers.get('User-Agent')
|
|
self.feeling_lucky = False
|
|
self.config = config
|
|
self.session_key = session_key
|
|
self.query = ''
|
|
self.cookies_disabled = cookies_disabled
|
|
self.search_type = self.request_params.get(
|
|
'tbm') if 'tbm' in self.request_params else ''
|
|
|
|
def __getitem__(self, name) -> Any:
|
|
return getattr(self, name)
|
|
|
|
def __setitem__(self, name, value) -> None:
|
|
return setattr(self, name, value)
|
|
|
|
def __delitem__(self, name) -> None:
|
|
return delattr(self, name)
|
|
|
|
def __contains__(self, name) -> bool:
|
|
return hasattr(self, name)
|
|
|
|
def new_search_query(self) -> str:
|
|
"""Parses a plaintext query into a valid string for submission
|
|
|
|
Also decrypts the query string, if encrypted (in the case of
|
|
paginated results).
|
|
|
|
Returns:
|
|
str: A valid query string
|
|
|
|
"""
|
|
q = self.request_params.get('q')
|
|
|
|
if q is None or len(q) == 0:
|
|
return ''
|
|
else:
|
|
# Attempt to decrypt if this is an internal link
|
|
try:
|
|
q = Fernet(self.session_key).decrypt(q.encode()).decode()
|
|
except InvalidToken:
|
|
pass
|
|
|
|
# Strip leading '! ' for "feeling lucky" queries
|
|
self.feeling_lucky = q.startswith('! ')
|
|
self.query = q[2:] if self.feeling_lucky else q
|
|
return self.query
|
|
|
|
def generate_response(self) -> str:
|
|
"""Generates a response for the user's query
|
|
|
|
Returns:
|
|
str: A string response to the search query, in the form of a URL
|
|
or string representation of HTML content.
|
|
|
|
"""
|
|
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
|
|
|
|
content_filter = Filter(self.session_key,
|
|
mobile=mobile,
|
|
config=self.config)
|
|
full_query = gen_query(self.query,
|
|
self.request_params,
|
|
self.config,
|
|
content_filter.near)
|
|
|
|
# force mobile search when view image is true and
|
|
# the request is not already made by a mobile
|
|
view_image = ('tbm=isch' in full_query
|
|
and self.config.view_image
|
|
and not g.user_request.mobile)
|
|
|
|
get_body = g.user_request.send(query=full_query,
|
|
force_mobile=view_image)
|
|
|
|
# Produce cleanable html soup from response
|
|
html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser')
|
|
|
|
# Replace current soup if view_image is active
|
|
if view_image:
|
|
html_soup = content_filter.view_image(html_soup)
|
|
|
|
# Indicate whether or not a Tor connection is active
|
|
if g.user_request.tor_valid:
|
|
html_soup.insert(0, bsoup(TOR_BANNER, 'html.parser'))
|
|
|
|
if self.feeling_lucky:
|
|
return get_first_link(html_soup)
|
|
else:
|
|
formatted_results = content_filter.clean(html_soup)
|
|
|
|
# Append user config to all search links, if available
|
|
param_str = ''.join('&{}={}'.format(k, v)
|
|
for k, v in
|
|
self.request_params.to_dict(flat=True).items()
|
|
if self.config.is_safe_key(k))
|
|
for link in formatted_results.find_all('a', href=True):
|
|
if 'search?' not in link['href'] or link['href'].index(
|
|
'search?') > 1:
|
|
continue
|
|
link['href'] += param_str
|
|
|
|
return str(formatted_results)
|
|
|
|
def check_kw_ip(self) -> re.Match:
|
|
"""Checks for keywords related to 'my ip' in the query
|
|
|
|
Returns:
|
|
bool
|
|
|
|
"""
|
|
return re.search("([^a-z0-9]|^)my *[^a-z0-9] *(ip|internet protocol)" +
|
|
"($|( *[^a-z0-9] *(((addres|address|adres|" +
|
|
"adress)|a)? *$)))", self.query.lower())
|