From b8031cd53fe19ac37f1962f5010b2669e45875d2 Mon Sep 17 00:00:00 2001 From: Ozzie Isaacs Date: Sat, 13 Jan 2024 10:53:46 +0100 Subject: [PATCH] Add possibility to replace kepub metadata on download --- cps/epub.py | 25 ++--- cps/epub_helper.py | 162 +++++++++++++++++++++++++++++++++ cps/helper.py | 38 ++++++-- cps/tasks/metadata_backup.py | 22 +---- cps/templates/config_edit.html | 2 +- 5 files changed, 206 insertions(+), 43 deletions(-) create mode 100644 cps/epub_helper.py diff --git a/cps/epub.py b/cps/epub.py index b45f3e51..11992637 100644 --- a/cps/epub.py +++ b/cps/epub.py @@ -23,10 +23,12 @@ from lxml import etree from . import isoLanguages, cover from . import config, logger from .helper import split_authors +from .epub_helper import get_content_opf, default_ns from .constants import BookMeta log = logger.create() + def _extract_cover(zip_file, cover_file, cover_path, tmp_file_name): if cover_file is None: return None @@ -44,24 +46,14 @@ def _extract_cover(zip_file, cover_file, cover_path, tmp_file_name): return cover.cover_processing(tmp_file_name, cf, extension) def get_epub_layout(book, book_data): - ns = { - 'n': 'urn:oasis:names:tc:opendocument:xmlns:container', - 'pkg': 'http://www.idpf.org/2007/opf', - } file_path = os.path.normpath(os.path.join(config.get_book_path(), book.path, book_data.name + "." + book_data.format.lower())) try: - epubZip = zipfile.ZipFile(file_path) - txt = epubZip.read('META-INF/container.xml') - tree = etree.fromstring(txt) - cfname = tree.xpath('n:rootfiles/n:rootfile/@full-path', namespaces=ns)[0] - cf = epubZip.read(cfname) + tree, __ = get_content_opf(file_path, default_ns) + p = tree.xpath('/pkg:package/pkg:metadata', namespaces=default_ns)[0] - tree = etree.fromstring(cf) - p = tree.xpath('/pkg:package/pkg:metadata', namespaces=ns)[0] - - layout = p.xpath('pkg:meta[@property="rendition:layout"]/text()', namespaces=ns) + layout = p.xpath('pkg:meta[@property="rendition:layout"]/text()', namespaces=default_ns) except (etree.XMLSyntaxError, KeyError, IndexError) as e: log.error("Could not parse epub metadata of book {} during kobo sync: {}".format(book.id, e)) layout = [] @@ -80,12 +72,7 @@ def get_epub_info(tmp_file_path, original_file_name, original_file_extension): } epub_zip = zipfile.ZipFile(tmp_file_path) - - txt = epub_zip.read('META-INF/container.xml') - tree = etree.fromstring(txt) - cf_name = tree.xpath('n:rootfiles/n:rootfile/@full-path', namespaces=ns)[0] - cf = epub_zip.read(cf_name) - tree = etree.fromstring(cf) + tree, cf_name = get_content_opf(epub_zip, ns) cover_path = os.path.dirname(cf_name) diff --git a/cps/epub_helper.py b/cps/epub_helper.py new file mode 100644 index 00000000..603ccc3d --- /dev/null +++ b/cps/epub_helper.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- + +# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) +# Copyright (C) 2018 lemmsh, Kennyl, Kyosfonica, matthazinski +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import zipfile +from lxml import etree + +from . import isoLanguages + +default_ns = { + 'n': 'urn:oasis:names:tc:opendocument:xmlns:container', + 'pkg': 'http://www.idpf.org/2007/opf', +} + +OPF_NAMESPACE = "http://www.idpf.org/2007/opf" +PURL_NAMESPACE = "http://purl.org/dc/elements/1.1/" + +OPF = "{%s}" % OPF_NAMESPACE +PURL = "{%s}" % PURL_NAMESPACE + +etree.register_namespace("opf", OPF_NAMESPACE) +etree.register_namespace("dc", PURL_NAMESPACE) + +OPF_NS = {None: OPF_NAMESPACE} # the default namespace (no prefix) +NSMAP = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE} + + +def updateEpub(src, dest, filename, data, ): + # create a temp copy of the archive without filename + with zipfile.ZipFile(src, 'r') as zin: + with zipfile.ZipFile(dest, 'w') as zout: + zout.comment = zin.comment # preserve the comment + for item in zin.infolist(): + if item.filename != filename: + zout.writestr(item, zin.read(item.filename)) + + # now add filename with its new data + with zipfile.ZipFile(dest, mode='a', compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr(filename, data) + + +def get_content_opf(file_path, ns=default_ns): + epubZip = zipfile.ZipFile(file_path) + txt = epubZip.read('META-INF/container.xml') + tree = etree.fromstring(txt) + cf_name = tree.xpath('n:rootfiles/n:rootfile/@full-path', namespaces=ns)[0] + cf = epubZip.read(cf_name) + + return etree.fromstring(cf), cf_name + + +def create_new_metadata_backup(book, custom_columns, export_language, translated_cover_name, lang_type=3): + # generate root package element + package = etree.Element(OPF + "package", nsmap=OPF_NS) + package.set("unique-identifier", "uuid_id") + package.set("version", "2.0") + + # generate metadata element and all sub elements of it + metadata = etree.SubElement(package, "metadata", nsmap=NSMAP) + identifier = etree.SubElement(metadata, PURL + "identifier", id="calibre_id", nsmap=NSMAP) + identifier.set(OPF + "scheme", "calibre") + identifier.text = str(book.id) + identifier2 = etree.SubElement(metadata, PURL + "identifier", id="uuid_id", nsmap=NSMAP) + identifier2.set(OPF + "scheme", "uuid") + identifier2.text = book.uuid + title = etree.SubElement(metadata, PURL + "title", nsmap=NSMAP) + title.text = book.title + for author in book.authors: + creator = etree.SubElement(metadata, PURL + "creator", nsmap=NSMAP) + creator.text = str(author.name) + creator.set(OPF + "file-as", book.author_sort) # ToDo Check + creator.set(OPF + "role", "aut") + contributor = etree.SubElement(metadata, PURL + "contributor", nsmap=NSMAP) + contributor.text = "calibre (5.7.2) [https://calibre-ebook.com]" + contributor.set(OPF + "file-as", "calibre") # ToDo Check + contributor.set(OPF + "role", "bkp") + + date = etree.SubElement(metadata, PURL + "date", nsmap=NSMAP) + date.text = '{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format(d=book.pubdate) + if book.comments and book.comments[0].text: + for b in book.comments: + description = etree.SubElement(metadata, PURL + "description", nsmap=NSMAP) + description.text = b.text + for b in book.publishers: + publisher = etree.SubElement(metadata, PURL + "publisher", nsmap=NSMAP) + publisher.text = str(b.name) + if not book.languages: + language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP) + language.text = export_language + else: + for b in book.languages: + language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP) + language.text = str(b.lang_code) if lang_type == 3 else isoLanguages.get(part3=b.lang_code).part1 + for b in book.tags: + subject = etree.SubElement(metadata, PURL + "subject", nsmap=NSMAP) + subject.text = str(b.name) + etree.SubElement(metadata, "meta", name="calibre:author_link_map", + content="{" + ", ".join(['"' + str(a.name) + '": ""' for a in book.authors]) + "}", + nsmap=NSMAP) + for b in book.series: + etree.SubElement(metadata, "meta", name="calibre:series", + content=str(str(b.name)), + nsmap=NSMAP) + if book.series: + etree.SubElement(metadata, "meta", name="calibre:series_index", + content=str(book.series_index), + nsmap=NSMAP) + if len(book.ratings) and book.ratings[0].rating > 0: + etree.SubElement(metadata, "meta", name="calibre:rating", + content=str(book.ratings[0].rating), + nsmap=NSMAP) + etree.SubElement(metadata, "meta", name="calibre:timestamp", + content='{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format( + d=book.timestamp), + nsmap=NSMAP) + etree.SubElement(metadata, "meta", name="calibre:title_sort", + content=book.sort, + nsmap=NSMAP) + sequence = 0 + for cc in custom_columns: + value = None + extra = None + cc_entry = getattr(book, "custom_column_" + str(cc.id)) + if cc_entry.__len__(): + value = [c.value for c in cc_entry] if cc.is_multiple else cc_entry[0].value + extra = cc_entry[0].extra if hasattr(cc_entry[0], "extra") else None + etree.SubElement(metadata, "meta", name="calibre:user_metadata:#{}".format(cc.label), + content=cc.to_json(value, extra, sequence), + nsmap=NSMAP) + sequence += 1 + + # generate guide element and all sub elements of it + # Title is translated from default export language + guide = etree.SubElement(package, "guide") + etree.SubElement(guide, "reference", type="cover", title=translated_cover_name, href="cover.jpg") + + return package + +def replace_metadata(tree, package): + rep_element = tree.xpath('/pkg:package/pkg:metadata', namespaces=default_ns)[0] + new_element = package.xpath('//metadata', namespaces=default_ns)[0] + tree.replace(rep_element, new_element) + return etree.tostring(tree, + xml_declaration=True, + encoding='utf-8', + pretty_print=True).decode('utf-8') + + diff --git a/cps/helper.py b/cps/helper.py index 42580be7..ad89e2bf 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -28,6 +28,7 @@ from datetime import datetime, timedelta import requests import unidecode from uuid import uuid4 +from lxml import etree from flask import send_from_directory, make_response, redirect, abort, url_for from flask_babel import gettext as _ @@ -61,6 +62,7 @@ from .tasks.mail import TaskEmail from .tasks.thumbnail import TaskClearCoverThumbnailCache, TaskGenerateCoverThumbnails from .tasks.metadata_backup import TaskBackupMetadata from .file_helper import get_temp_dir +from .epub_helper import get_content_opf, create_new_metadata_backup, updateEpub, replace_metadata log = logger.create() @@ -942,13 +944,18 @@ def do_download_file(book, book_format, client, data, headers): df = gd.getFileFromEbooksFolder(book.path, book_name + "." + book_format) # log.debug('%s', time.time() - startTime) if df: - if config.config_binariesdir and config.config_embed_metadata: + if config.config_embed_metadata and ( + (book_format == "kepub" and config.config_kepubifypath ) or + (book_format != "kepub" and config.config_binariesdir)): output_path = os.path.join(config.config_calibre_dir, book.path) if not os.path.exists(output_path): os.makedirs(output_path) output = os.path.join(config.config_calibre_dir, book.path, book_name + "." + book_format) gd.downloadFile(book.path, book_name + "." + book_format, output) - filename, download_name = do_calibre_export(book.id, book_format) + if book_format == "kepub" and config.config_kepubifypath: + filename, download_name = do_kepubify_metadata_replace(book, output) + elif book_format != "kepub" and config.config_binariesdir: + filename, download_name = do_calibre_export(book.id, book_format) else: return gd.do_gdrive_download(df, headers) else: @@ -962,8 +969,11 @@ def do_download_file(book, book_format, client, data, headers): if client == "kobo" and book_format == "kepub": headers["Content-Disposition"] = headers["Content-Disposition"].replace(".kepub", ".kepub.epub") - if config.config_binariesdir and config.config_embed_metadata: - filename, download_name = do_calibre_export(book.id, book_format) + if book_format == "kepub" and config.config_kepubifypath and config.config_embed_metadata: + filename, download_name = do_kepubify_metadata_replace(book, os.path.join(filename, + book_name + "." + book_format)) + elif book_format != "kepub" and config.config_binariesdir and config.config_embed_metadata: + filename, download_name = do_calibre_export(book.id, book_format) else: download_name = book_name @@ -975,7 +985,23 @@ def do_download_file(book, book_format, client, data, headers): return response -def do_calibre_export(book_id, book_format): +def do_kepubify_metadata_replace(book, file_path): + custom_columns = (calibre_db.session.query(db.CustomColumns) + .filter(db.CustomColumns.mark_for_delete == 0) + .filter(db.CustomColumns.datatype.notin_(db.cc_exceptions)) + .order_by(db.CustomColumns.label).all()) + + tree, cf_name = get_content_opf(file_path) + package = create_new_metadata_backup(book, custom_columns, current_user.locale, _("Cover"), lang_type=2) + content = replace_metadata(tree, package) + tmp_dir = get_temp_dir() + temp_file_name = str(uuid4()) + # open zipfile and replace metadata block in content.opf + updateEpub(file_path, os.path.join(tmp_dir, temp_file_name + ".kepub"), cf_name, content) + return tmp_dir, temp_file_name + + +def do_calibre_export(book_id, book_format, ): try: quotes = [3, 5, 7, 9] tmp_dir = get_temp_dir() @@ -1081,7 +1107,7 @@ def tags_filters(): # checks if domain is in database (including wildcards) -# example SELECT * FROM @TABLE WHERE 'abcdefg' LIKE Name; +# example SELECT * FROM @TABLE WHERE 'abcdefg' LIKE Name; # from https://code.luasoftware.com/tutorials/flask/execute-raw-sql-in-flask-sqlalchemy/ # in all calls the email address is checked for validity def check_valid_domain(domain_text): diff --git a/cps/tasks/metadata_backup.py b/cps/tasks/metadata_backup.py index 45015ccf..9ca6b830 100644 --- a/cps/tasks/metadata_backup.py +++ b/cps/tasks/metadata_backup.py @@ -17,26 +17,13 @@ # along with this program. If not, see . import os -from urllib.request import urlopen from lxml import etree - from cps import config, db, gdriveutils, logger from cps.services.worker import CalibreTask from flask_babel import lazy_gettext as N_ -OPF_NAMESPACE = "http://www.idpf.org/2007/opf" -PURL_NAMESPACE = "http://purl.org/dc/elements/1.1/" - -OPF = "{%s}" % OPF_NAMESPACE -PURL = "{%s}" % PURL_NAMESPACE - -etree.register_namespace("opf", OPF_NAMESPACE) -etree.register_namespace("dc", PURL_NAMESPACE) - -OPF_NS = {None: OPF_NAMESPACE} # the default namespace (no prefix) -NSMAP = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE} - +from ..epub_helper import create_new_metadata_backup class TaskBackupMetadata(CalibreTask): @@ -101,7 +88,8 @@ class TaskBackupMetadata(CalibreTask): self.calibre_db.session.close() def open_metadata(self, book, custom_columns): - package = self.create_new_metadata_backup(book, custom_columns) + # package = self.create_new_metadata_backup(book, custom_columns) + package = create_new_metadata_backup(book, custom_columns, self.export_language) if config.config_use_google_drive: if not gdriveutils.is_gdrive_ready(): raise Exception('Google Drive is configured but not ready') @@ -123,7 +111,7 @@ class TaskBackupMetadata(CalibreTask): except Exception as ex: raise Exception('Writing Metadata failed with error: {} '.format(ex)) - def create_new_metadata_backup(self, book, custom_columns): + '''def create_new_metadata_backup(self, book, custom_columns): # generate root package element package = etree.Element(OPF + "package", nsmap=OPF_NS) package.set("unique-identifier", "uuid_id") @@ -208,7 +196,7 @@ class TaskBackupMetadata(CalibreTask): guide = etree.SubElement(package, "guide") etree.SubElement(guide, "reference", type="cover", title=self.translated_title, href="cover.jpg") - return package + return package''' @property def name(self): diff --git a/cps/templates/config_edit.html b/cps/templates/config_edit.html index c74bf6c6..d83831db 100644 --- a/cps/templates/config_edit.html +++ b/cps/templates/config_edit.html @@ -105,7 +105,7 @@
- +