* Add a UserKeyToken to the User table for Kobo authorization.

* Add proper authorization checks on the new Kobo endpoints.

Important Note: As a side-effect, all CalibreWeb API calls can be
authorized using this token (i.e without a username&password).
This commit is contained in:
Michael Shavit 2019-12-07 19:21:08 -05:00
parent 55b54de6a0
commit 9ede01f130
6 changed files with 105 additions and 9 deletions

View File

@ -565,7 +565,6 @@ def edit_user(user_id):
else: else:
if "password" in to_save and to_save["password"]: if "password" in to_save and to_save["password"]:
content.password = generate_password_hash(to_save["password"]) content.password = generate_password_hash(to_save["password"])
anonymous = content.is_anonymous anonymous = content.is_anonymous
content.role = constants.selected_roles(to_save) content.role = constants.selected_roles(to_save)
if anonymous: if anonymous:
@ -593,6 +592,18 @@ def edit_user(user_id):
content.default_language = to_save["default_language"] content.default_language = to_save["default_language"]
if "locale" in to_save and to_save["locale"]: if "locale" in to_save and to_save["locale"]:
content.locale = to_save["locale"] content.locale = to_save["locale"]
if "kobo_user_key" in to_save and to_save["kobo_user_key"]:
kobo_user_key_hash = generate_password_hash(to_save["kobo_user_key"])
if kobo_user_key_hash != content.kobo_user_key_hash:
existing_kobo_user_key = ub.session.query(ub.User).filter(ub.User.kobo_user_key_hash == kobo_user_key_hash).first()
if not existing_kobo_user_key:
content.kobo_user_key_hash = kobo_user_key_hash
else:
flash(_(u"Found an existing account for this Kobo UserKey."), category="error")
return render_title_template("user_edit.html", translations=translations, languages=languages,
new_user=0, content=content, downloads=downloads, registered_oauth=oauth_check,
title=_(u"Edit User %(nick)s", nick=content.nickname), page="edituser")
if to_save["email"] and to_save["email"] != content.email: if to_save["email"] and to_save["email"] != content.email:
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \ existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
.first() .first()

View File

@ -2,9 +2,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from flask import Blueprint, request, flash, redirect, url_for from flask import Blueprint, request, flash, redirect, url_for
from . import logger, ub, searched_ids, db, helper from . import config, logger, kobo_auth, ub, db, helper
from . import config from .web import download_required
from flask import make_response from flask import make_response
from flask import jsonify from flask import jsonify
from flask import json from flask import json
@ -22,15 +21,17 @@ from .constants import CONFIG_DIR as _CONFIG_DIR
import copy import copy
import jsonschema import jsonschema
from sqlalchemy import func from sqlalchemy import func
from flask_login import login_required
B2_SECRETS = os.path.join(_CONFIG_DIR, "b2_secrets.json") B2_SECRETS = os.path.join(_CONFIG_DIR, "b2_secrets.json")
kobo = Blueprint("kobo", __name__) kobo = Blueprint("kobo", __name__)
kobo_auth.disable_failed_auth_redirect_for_blueprint(kobo)
log = logger.create() log = logger.create()
import base64 import base64
def b64encode(data): def b64encode(data):
return base64.b64encode(data) return base64.b64encode(data)
@ -143,6 +144,8 @@ class SyncToken:
@kobo.route("/v1/library/sync") @kobo.route("/v1/library/sync")
@login_required
@download_required
def HandleSyncRequest(): def HandleSyncRequest():
sync_token = SyncToken.from_headers(request.headers) sync_token = SyncToken.from_headers(request.headers)
log.info("Kobo library sync request received.") log.info("Kobo library sync request received.")
@ -190,6 +193,8 @@ def HandleSyncRequest():
@kobo.route("/v1/library/<book_uuid>/metadata") @kobo.route("/v1/library/<book_uuid>/metadata")
@login_required
@download_required
def get_metadata__v1(book_uuid): def get_metadata__v1(book_uuid):
log.info("Kobo library metadata request received for book %s" % book_uuid) log.info("Kobo library metadata request received for book %s" % book_uuid)
book = db.session.query(db.Books).filter(db.Books.uuid == book_uuid).first() book = db.session.query(db.Books).filter(db.Books.uuid == book_uuid).first()

67
cps/kobo_auth.py Normal file
View File

@ -0,0 +1,67 @@
"""This module is used to control authentication/authorization of Kobo sync requests.
This module also includes research notes into the auth protocol used by Kobo devices.
Log-in:
When first booting a Kobo device the user must sign into a Kobo (or affiliate) account.
Upon successful sign-in, the user is redirected to
https://auth.kobobooks.com/CrossDomainSignIn?id=<some id>
which serves the following response:
<script type='text/javascript'>location.href='kobo://UserAuthenticated?userId=<redacted>&userKey<redacted>&email=<redacted>&returnUrl=https%3a%2f%2fwww.kobo.com';</script>.
And triggers the insertion of a userKey into the device's User table.
IMPORTANT SECURITY CAUTION:
Together, the device's DeviceId and UserKey act as an *irrevocable* authentication
token to most (if not all) Kobo APIs. In fact, in most cases only the UserKey is
required to authorize the API call.
Changing Kobo password *does not* invalidate user keys! This is apparently a known
issue for a few years now https://www.mobileread.com/forums/showpost.php?p=3476851&postcount=13
(although this poster hypothesised that Kobo could blacklist a DeviceId, many endpoints
will still grant access given the userkey.)
Api authorization:
* For most of the endpoints we care about (sync, metadata, tags, etc), the userKey is
passed in the x-kobo-userkey header, and is sufficient to authorize the API call.
* Some endpoints (e.g: AnnotationService) instead make use of Bearer tokens. To get a
BearerToken, the device makes a POST request to the v1/auth/device endpoint with the
secret UserKey and the device's DeviceId.
Our implementation:
For now, we rely on the official Kobo store's UserKey for authentication. Because of the
irrevocable power granted by the key, we only ever store and compare a hash of the key.
To obtain their UserKey, a user can query the user table from the
.kobo/KoboReader.sqlite database found on their device.
This isn't exactly user friendly however.
Some possible alternatives that require more research:
* Instead of having users query the device database to find out their UserKey, we could
provide a list of recent Kobo sync attempts in the calibre-web UI for users to
authenticate sync attempts (e.g: 'this was me' button).
* We may be able to craft a sign-in flow with a redirect back to the CalibreWeb
server containing the KoboStore's UserKey.
* Can we create our own UserKey instead of relying on the real store's userkey?
(Maybe using something like location.href=kobo://UserAuthenticated?userId=...?)
"""
from functools import wraps
from flask import request, make_response
from werkzeug.security import check_password_hash
from . import logger, ub, lm
USER_KEY_HEADER = "x-kobo-userkey"
log = logger.create()
def disable_failed_auth_redirect_for_blueprint(bp):
lm.blueprint_login_views[bp.name] = None
@lm.request_loader
def load_user_from_kobo_request(request):
user_key = request.headers.get(USER_KEY_HEADER)
if user_key:
for user in (
ub.session.query(ub.User).filter(ub.User.kobo_user_key_hash != "").all()
):
if check_password_hash(str(user.kobo_user_key_hash), user_key):
return user
log.info("Received Kobo request without a recognizable UserKey.")
return None

View File

@ -28,6 +28,10 @@
<input type="email" class="form-control" name="kindle_mail" id="kindle_mail" value="{{ content.kindle_mail if content.kindle_mail != None }}"> <input type="email" class="form-control" name="kindle_mail" id="kindle_mail" value="{{ content.kindle_mail if content.kindle_mail != None }}">
</div> </div>
<div class="form-group"> <div class="form-group">
<div class="form-group">
<label for="kobo_user_key">{{_('KoboStore UserKey')}}</label>
<input type="password" class="form-control" name="kobo_user_key" id="kobo_user_key" value="" autocomplete="off">
</div>
<label for="locale">{{_('Language')}}</label> <label for="locale">{{_('Language')}}</label>
<select name="locale" id="locale" class="form-control"> <select name="locale" id="locale" class="form-control">
{% for translation in translations %} {% for translation in translations %}

View File

@ -173,6 +173,7 @@ class User(UserBase, Base):
role = Column(SmallInteger, default=constants.ROLE_USER) role = Column(SmallInteger, default=constants.ROLE_USER)
password = Column(String) password = Column(String)
kindle_mail = Column(String(120), default="") kindle_mail = Column(String(120), default="")
kobo_user_key_hash = Column(String, unique=True, default="")
shelf = relationship('Shelf', backref='user', lazy='dynamic', order_by='Shelf.name') shelf = relationship('Shelf', backref='user', lazy='dynamic', order_by='Shelf.name')
downloads = relationship('Downloads', backref='user', lazy='dynamic') downloads = relationship('Downloads', backref='user', lazy='dynamic')
locale = Column(String(2), default="en") locale = Column(String(2), default="en")
@ -375,7 +376,12 @@ def migrate_Database(session):
except exc.OperationalError: except exc.OperationalError:
conn = engine.connect() conn = engine.connect()
conn.execute("ALTER TABLE user ADD column `mature_content` INTEGER DEFAULT 1") conn.execute("ALTER TABLE user ADD column `mature_content` INTEGER DEFAULT 1")
try:
session.query(exists().where(User.kobo_user_key_hash)).scalar()
except exc.OperationalError: # Database is not compatible, some rows are missing
conn = engine.connect()
conn.execute("ALTER TABLE user ADD column `kobo_user_key_hash` VARCHAR")
session.commit()
if session.query(User).filter(User.role.op('&')(constants.ROLE_ANONYMOUS) == constants.ROLE_ANONYMOUS).first() is None: if session.query(User).filter(User.role.op('&')(constants.ROLE_ANONYMOUS) == constants.ROLE_ANONYMOUS).first() is None:
create_anonymous_user(session) create_anonymous_user(session)
try: try:
@ -391,6 +397,7 @@ def migrate_Database(session):
"role SMALLINT," "role SMALLINT,"
"password VARCHAR," "password VARCHAR,"
"kindle_mail VARCHAR(120)," "kindle_mail VARCHAR(120),"
"kobo_user_key_hash VARCHAR,"
"locale VARCHAR(2)," "locale VARCHAR(2),"
"sidebar_view INTEGER," "sidebar_view INTEGER,"
"default_language VARCHAR(3)," "default_language VARCHAR(3),"
@ -398,9 +405,9 @@ def migrate_Database(session):
"UNIQUE (nickname)," "UNIQUE (nickname),"
"UNIQUE (email)," "UNIQUE (email),"
"CHECK (mature_content IN (0, 1)))") "CHECK (mature_content IN (0, 1)))")
conn.execute("INSERT INTO user_id(id, nickname, email, role, password, kindle_mail,locale," conn.execute("INSERT INTO user_id(id, nickname, email, role, password, kindle_mail,kobo_user_key_hash, locale,"
"sidebar_view, default_language, mature_content) " "sidebar_view, default_language, mature_content) "
"SELECT id, nickname, email, role, password, kindle_mail, locale," "SELECT id, nickname, email, role, password, kindle_mail, kobo_user_key_hash, locale,"
"sidebar_view, default_language, mature_content FROM user") "sidebar_view, default_language, mature_content FROM user")
# delete old user table and rename new user_id table to user: # delete old user table and rename new user_id table to user:
conn.execute("DROP TABLE user") conn.execute("DROP TABLE user")

View File

@ -1246,6 +1246,8 @@ def profile():
current_user.password = generate_password_hash(to_save["password"]) current_user.password = generate_password_hash(to_save["password"])
if "kindle_mail" in to_save and to_save["kindle_mail"] != current_user.kindle_mail: if "kindle_mail" in to_save and to_save["kindle_mail"] != current_user.kindle_mail:
current_user.kindle_mail = to_save["kindle_mail"] current_user.kindle_mail = to_save["kindle_mail"]
if "kobo_user_key" in to_save and to_save["kobo_user_key"]:
current_user.kobo_user_key_hash = generate_password_hash(to_save["kobo_user_key"])
if to_save["email"] and to_save["email"] != current_user.email: if to_save["email"] and to_save["email"] != current_user.email:
if config.config_public_reg and not check_valid_domain(to_save["email"]): if config.config_public_reg and not check_valid_domain(to_save["email"]):
flash(_(u"E-mail is not from valid domain"), category="error") flash(_(u"E-mail is not from valid domain"), category="error")
@ -1274,7 +1276,7 @@ def profile():
ub.session.commit() ub.session.commit()
except IntegrityError: except IntegrityError:
ub.session.rollback() ub.session.rollback()
flash(_(u"Found an existing account for this e-mail address."), category="error") flash(_(u"Found an existing account for this e-mail address or Kobo UserKey."), category="error")
return render_title_template("user_edit.html", content=current_user, downloads=downloads, return render_title_template("user_edit.html", content=current_user, downloads=downloads,
translations=translations, translations=translations,
title=_(u"%(name)s's profile", name=current_user.nickname), page="me", title=_(u"%(name)s's profile", name=current_user.nickname), page="me",