From 41223c1237675df5da39634be7335704955ea2c0 Mon Sep 17 00:00:00 2001 From: nathom Date: Mon, 22 Mar 2021 13:03:08 -0700 Subject: [PATCH] Cleanup --- {qobuz_dl_rewrite => music_dl}/__init__.py | 0 {qobuz_dl_rewrite => music_dl}/cli.py | 0 {qobuz_dl_rewrite => music_dl}/clients.py | 0 {qobuz_dl_rewrite => music_dl}/config.py | 0 {qobuz_dl_rewrite => music_dl}/constants.py | 0 {qobuz_dl_rewrite => music_dl}/converter.py | 0 {qobuz_dl_rewrite => music_dl}/core.py | 0 {qobuz_dl_rewrite => music_dl}/db.py | 0 {qobuz_dl_rewrite => music_dl}/downloader.py | 0 {qobuz_dl_rewrite => music_dl}/exceptions.py | 0 {qobuz_dl_rewrite => music_dl}/metadata.py | 0 {qobuz_dl_rewrite => music_dl}/spoofbuz.py | 0 {qobuz_dl_rewrite => music_dl}/utils.py | 0 qobuz_dl/__init__.py | 2 - qobuz_dl/cli.py | 187 ------ qobuz_dl/color.py | 14 - qobuz_dl/commands.py | 167 ------ qobuz_dl/core.py | 575 ------------------- qobuz_dl/db.py | 39 -- qobuz_dl/downloader.py | 400 ------------- qobuz_dl/exceptions.py | 22 - qobuz_dl/metadata.py | 224 -------- qobuz_dl/qopy.py | 204 ------- qobuz_dl/spoofbuz.py | 51 -- setup.py | 25 +- 25 files changed, 15 insertions(+), 1895 deletions(-) rename {qobuz_dl_rewrite => music_dl}/__init__.py (100%) rename {qobuz_dl_rewrite => music_dl}/cli.py (100%) rename {qobuz_dl_rewrite => music_dl}/clients.py (100%) rename {qobuz_dl_rewrite => music_dl}/config.py (100%) rename {qobuz_dl_rewrite => music_dl}/constants.py (100%) rename {qobuz_dl_rewrite => music_dl}/converter.py (100%) rename {qobuz_dl_rewrite => music_dl}/core.py (100%) rename {qobuz_dl_rewrite => music_dl}/db.py (100%) rename {qobuz_dl_rewrite => music_dl}/downloader.py (100%) rename {qobuz_dl_rewrite => music_dl}/exceptions.py (100%) rename {qobuz_dl_rewrite => music_dl}/metadata.py (100%) rename {qobuz_dl_rewrite => music_dl}/spoofbuz.py (100%) rename {qobuz_dl_rewrite => music_dl}/utils.py (100%) delete mode 100644 qobuz_dl/__init__.py delete mode 100644 qobuz_dl/cli.py delete mode 100644 qobuz_dl/color.py delete mode 100644 qobuz_dl/commands.py delete mode 100644 qobuz_dl/core.py delete mode 100644 qobuz_dl/db.py delete mode 100644 qobuz_dl/downloader.py delete mode 100644 qobuz_dl/exceptions.py delete mode 100644 qobuz_dl/metadata.py delete mode 100644 qobuz_dl/qopy.py delete mode 100644 qobuz_dl/spoofbuz.py diff --git a/qobuz_dl_rewrite/__init__.py b/music_dl/__init__.py similarity index 100% rename from qobuz_dl_rewrite/__init__.py rename to music_dl/__init__.py diff --git a/qobuz_dl_rewrite/cli.py b/music_dl/cli.py similarity index 100% rename from qobuz_dl_rewrite/cli.py rename to music_dl/cli.py diff --git a/qobuz_dl_rewrite/clients.py b/music_dl/clients.py similarity index 100% rename from qobuz_dl_rewrite/clients.py rename to music_dl/clients.py diff --git a/qobuz_dl_rewrite/config.py b/music_dl/config.py similarity index 100% rename from qobuz_dl_rewrite/config.py rename to music_dl/config.py diff --git a/qobuz_dl_rewrite/constants.py b/music_dl/constants.py similarity index 100% rename from qobuz_dl_rewrite/constants.py rename to music_dl/constants.py diff --git a/qobuz_dl_rewrite/converter.py b/music_dl/converter.py similarity index 100% rename from qobuz_dl_rewrite/converter.py rename to music_dl/converter.py diff --git a/qobuz_dl_rewrite/core.py b/music_dl/core.py similarity index 100% rename from qobuz_dl_rewrite/core.py rename to music_dl/core.py diff --git a/qobuz_dl_rewrite/db.py b/music_dl/db.py similarity index 100% rename from qobuz_dl_rewrite/db.py rename to music_dl/db.py diff --git a/qobuz_dl_rewrite/downloader.py b/music_dl/downloader.py similarity index 100% rename from qobuz_dl_rewrite/downloader.py rename to music_dl/downloader.py diff --git a/qobuz_dl_rewrite/exceptions.py b/music_dl/exceptions.py similarity index 100% rename from qobuz_dl_rewrite/exceptions.py rename to music_dl/exceptions.py diff --git a/qobuz_dl_rewrite/metadata.py b/music_dl/metadata.py similarity index 100% rename from qobuz_dl_rewrite/metadata.py rename to music_dl/metadata.py diff --git a/qobuz_dl_rewrite/spoofbuz.py b/music_dl/spoofbuz.py similarity index 100% rename from qobuz_dl_rewrite/spoofbuz.py rename to music_dl/spoofbuz.py diff --git a/qobuz_dl_rewrite/utils.py b/music_dl/utils.py similarity index 100% rename from qobuz_dl_rewrite/utils.py rename to music_dl/utils.py diff --git a/qobuz_dl/__init__.py b/qobuz_dl/__init__.py deleted file mode 100644 index c88afc3..0000000 --- a/qobuz_dl/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .cli import main -from .qopy import Client diff --git a/qobuz_dl/cli.py b/qobuz_dl/cli.py deleted file mode 100644 index fdc3592..0000000 --- a/qobuz_dl/cli.py +++ /dev/null @@ -1,187 +0,0 @@ -import configparser -import glob -import hashlib -import logging -import os -import sys - -import qobuz_dl.spoofbuz as spoofbuz -from qobuz_dl.color import GREEN, RED, YELLOW -from qobuz_dl.commands import qobuz_dl_args -from qobuz_dl.core import QobuzDL - -logging.basicConfig( - level=logging.INFO, - format="%(message)s", -) - -if os.name == "nt": - OS_CONFIG = os.environ.get("APPDATA") -else: - OS_CONFIG = os.path.join(os.environ["HOME"], ".config") - -CONFIG_PATH = os.path.join(OS_CONFIG, "qobuz-dl") -CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini") -QOBUZ_DB = os.path.join(CONFIG_PATH, "qobuz_dl.db") - - -def reset_config(config_file): - logging.info(f"{YELLOW}Creating config file: {config_file}") - config = configparser.ConfigParser() - config["DEFAULT"]["email"] = input("Enter your email:\n- ") - password = input("Enter your password\n- ") - config["DEFAULT"]["password"] = hashlib.md5(password.encode("utf-8")).hexdigest() - config["DEFAULT"]["default_folder"] = ( - input("Folder for downloads (leave empty for default 'Qobuz Downloads')\n- ") - or "Qobuz Downloads" - ) - config["DEFAULT"]["default_quality"] = ( - input( - "Download quality (5, 6, 7, 27) " - "[320, LOSSLESS, 24B <96KHZ, 24B >96KHZ]" - "\n(leave empty for default '6')\n- " - ) - or "6" - ) - config["DEFAULT"]["default_limit"] = "20" - config["DEFAULT"]["no_m3u"] = "false" - config["DEFAULT"]["albums_only"] = "false" - config["DEFAULT"]["no_fallback"] = "false" - config["DEFAULT"]["og_cover"] = "false" - config["DEFAULT"]["embed_art"] = "false" - config["DEFAULT"]["no_cover"] = "false" - config["DEFAULT"]["no_database"] = "false" - logging.info(f"{YELLOW}Getting tokens. Please wait...") - spoofer = spoofbuz.Spoofer() - config["DEFAULT"]["app_id"] = str(spoofer.getAppId()) - config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values()) - config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " - "[{bit_depth}B-{sampling_rate}kHz]" - config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" - config["DEFAULT"]["smart_discography"] = "false" - with open(config_file, "w") as configfile: - config.write(configfile) - logging.info( - f"{GREEN}Config file updated. Edit more options in {config_file}" - "\nso you don't have to call custom flags every time you run " - "a qobuz-dl command." - ) - - -def remove_leftovers(directory): - directory = os.path.join(directory, "**", ".*.tmp") - for i in glob.glob(directory, recursive=True): - try: - os.remove(i) - except: # noqa - pass - - -def main(): - if not os.path.isdir(CONFIG_PATH) or not os.path.isfile(CONFIG_FILE): - os.makedirs(CONFIG_PATH, exist_ok=True) - reset_config(CONFIG_FILE) - - if len(sys.argv) < 2: - sys.exit(qobuz_dl_args().print_help()) - - config = configparser.ConfigParser() - config.read(CONFIG_FILE) - - try: - email = config["DEFAULT"]["email"] - password = config["DEFAULT"]["password"] - default_folder = config["DEFAULT"]["default_folder"] - default_limit = config["DEFAULT"]["default_limit"] - default_quality = config["DEFAULT"]["default_quality"] - no_m3u = config.getboolean("DEFAULT", "no_m3u") - albums_only = config.getboolean("DEFAULT", "albums_only") - no_fallback = config.getboolean("DEFAULT", "no_fallback") - og_cover = config.getboolean("DEFAULT", "og_cover") - embed_art = config.getboolean("DEFAULT", "embed_art") - no_cover = config.getboolean("DEFAULT", "no_cover") - no_database = config.getboolean("DEFAULT", "no_database") - app_id = config["DEFAULT"]["app_id"] - - if ( - "folder_format" not in config["DEFAULT"] - or "track_format" not in config["DEFAULT"] - or "smart_discography" not in config["DEFAULT"] - ): - logging.info( - f"{YELLOW}Config file does not include some settings, updating..." - ) - config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " - "[{bit_depth}B-{sampling_rate}kHz]" - config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" - config["DEFAULT"]["smart_discography"] = "false" - with open(CONFIG_FILE, "w") as cf: - config.write(cf) - - smart_discography = config.getboolean("DEFAULT", "smart_discography") - folder_format = config["DEFAULT"]["folder_format"] - track_format = config["DEFAULT"]["track_format"] - - secrets = [ - secret for secret in config["DEFAULT"]["secrets"].split(",") if secret - ] - arguments = qobuz_dl_args( - default_quality, default_limit, default_folder - ).parse_args() - except (KeyError, UnicodeDecodeError, configparser.Error): - arguments = qobuz_dl_args().parse_args() - if not arguments.reset: - sys.exit( - f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this." - ) - - if arguments.reset: - sys.exit(reset_config(CONFIG_FILE)) - - if arguments.purge: - try: - os.remove(QOBUZ_DB) - except FileNotFoundError: - pass - sys.exit(f"{GREEN}The database was deleted.") - - qobuz = QobuzDL( - arguments.directory, - arguments.quality, - arguments.embed_art or embed_art, - ignore_singles_eps=arguments.albums_only or albums_only, - no_m3u_for_playlists=arguments.no_m3u or no_m3u, - quality_fallback=not arguments.no_fallback or not no_fallback, - cover_og_quality=arguments.og_cover or og_cover, - no_cover=arguments.no_cover or no_cover, - downloads_db=None if no_database or arguments.no_db else QOBUZ_DB, - folder_format=arguments.folder_format or folder_format, - track_format=arguments.track_format or track_format, - smart_discography=arguments.smart_discography or smart_discography, - ) - qobuz.initialize_client(email, password, app_id, secrets) - - try: - if arguments.command == "dl": - qobuz.download_list_of_urls(arguments.SOURCE) - elif arguments.command == "lucky": - query = " ".join(arguments.QUERY) - qobuz.lucky_type = arguments.type - qobuz.lucky_limit = arguments.number - qobuz.lucky_mode(query) - else: - qobuz.interactive_limit = arguments.limit - qobuz.interactive() - - except KeyboardInterrupt: - logging.info( - f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will " - "be skipped if you try to download the same releases again." - ) - - finally: - remove_leftovers(qobuz.directory) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/qobuz_dl/color.py b/qobuz_dl/color.py deleted file mode 100644 index e170c2d..0000000 --- a/qobuz_dl/color.py +++ /dev/null @@ -1,14 +0,0 @@ -from colorama import Fore, Style, init - -init(autoreset=True) - -DF = Style.NORMAL -BG = Style.BRIGHT -RESET = Style.RESET_ALL -OFF = Style.DIM -RED = Fore.RED -BLUE = Fore.BLUE -GREEN = Fore.GREEN -YELLOW = Fore.YELLOW -CYAN = Fore.CYAN -MAGENTA = Fore.MAGENTA diff --git a/qobuz_dl/commands.py b/qobuz_dl/commands.py deleted file mode 100644 index 08b02dd..0000000 --- a/qobuz_dl/commands.py +++ /dev/null @@ -1,167 +0,0 @@ -import argparse - - -def fun_args(subparsers, default_limit): - interactive = subparsers.add_parser( - "fun", - description="Interactively search for tracks and albums.", - help="interactive mode", - ) - interactive.add_argument( - "-l", - "--limit", - metavar="int", - default=default_limit, - help="limit of search results (default: 20)", - ) - return interactive - - -def lucky_args(subparsers): - lucky = subparsers.add_parser( - "lucky", - description="Download the first albums returned from a Qobuz search.", - help="lucky mode", - ) - lucky.add_argument( - "-t", - "--type", - default="album", - help="type of items to search (artist, album, track, playlist) (default: album)", - ) - lucky.add_argument( - "-n", - "--number", - metavar="int", - default=1, - help="number of results to download (default: 1)", - ) - lucky.add_argument("QUERY", nargs="+", help="search query") - return lucky - - -def dl_args(subparsers): - download = subparsers.add_parser( - "dl", - description="Download by album/track/artist/label/playlist/last.fm-playlist URL.", - help="input mode", - ) - download.add_argument( - "SOURCE", - metavar="SOURCE", - nargs="+", - help=("one or more URLs (space separated) or a text file"), - ) - return download - - -def add_common_arg(custom_parser, default_folder, default_quality): - custom_parser.add_argument( - "-d", - "--directory", - metavar="PATH", - default=default_folder, - help=f'directory for downloads (default: "{default_folder}")', - ) - custom_parser.add_argument( - "-q", - "--quality", - metavar="int", - default=default_quality, - help=( - 'audio "quality" (5, 6, 7, 27)\n' - f"[320, LOSSLESS, 24B<=96KHZ, 24B>96KHZ] (default: {default_quality})" - ), - ) - custom_parser.add_argument( - "--albums-only", - action="store_true", - help=("don't download singles, EPs and VA releases"), - ) - custom_parser.add_argument( - "--no-m3u", - action="store_true", - help="don't create .m3u files when downloading playlists", - ) - custom_parser.add_argument( - "--no-fallback", - action="store_true", - help="disable quality fallback (skip releases not available in set quality)", - ) - custom_parser.add_argument( - "-e", "--embed-art", action="store_true", help="embed cover art into files" - ) - custom_parser.add_argument( - "--og-cover", - action="store_true", - help="download cover art in its original quality (bigger file)", - ) - custom_parser.add_argument( - "--no-cover", action="store_true", help="don't download cover art" - ) - custom_parser.add_argument( - "--no-db", action="store_true", help="don't call the database" - ) - custom_parser.add_argument( - "-ff", - "--folder-format", - metavar="PATTERN", - help="""pattern for formatting folder names, e.g - "{artist} - {album} ({year})". available keys: artist, - albumartist, album, year, sampling_rate, bit_rate, tracktitle, version. - cannot contain characters used by the system, which includes /:<>""", - ) - custom_parser.add_argument( - "-tf", - "--track-format", - metavar="PATTERN", - help="pattern for formatting track names. see `folder-format`.", - ) - # TODO: add customization options - custom_parser.add_argument( - "-s", - "--smart-discography", - action="store_true", - help="""Try to filter out spam-like albums when requesting an artist's - discography, and other optimizations. Filters albums not made by requested - artist, and deluxe/live/collection albums. Gives preference to remastered - albums, high bit depth/dynamic range, and low sampling rates (to save space).""", - ) - - -def qobuz_dl_args( - default_quality=6, default_limit=20, default_folder="Qobuz Downloads" -): - parser = argparse.ArgumentParser( - prog="qobuz-dl", - description=( - "The ultimate Qobuz music downloader.\nSee usage" - " examples on https://github.com/vitiko98/qobuz-dl" - ), - formatter_class=argparse.RawTextHelpFormatter, - ) - parser.add_argument( - "-r", "--reset", action="store_true", help="create/reset config file" - ) - parser.add_argument( - "-p", - "--purge", - action="store_true", - help="purge/delete downloaded-IDs database", - ) - - subparsers = parser.add_subparsers( - title="commands", - description="run qobuz-dl --help for more info\n(e.g. qobuz-dl fun --help)", - dest="command", - ) - - interactive = fun_args(subparsers, default_limit) - download = dl_args(subparsers) - lucky = lucky_args(subparsers) - [ - add_common_arg(i, default_folder, default_quality) - for i in (interactive, download, lucky) - ] - - return parser diff --git a/qobuz_dl/core.py b/qobuz_dl/core.py deleted file mode 100644 index 95b634f..0000000 --- a/qobuz_dl/core.py +++ /dev/null @@ -1,575 +0,0 @@ -import logging -import os -import re -import string -import sys -import time -from typing import Tuple - -import requests -from bs4 import BeautifulSoup as bso -from mutagen.flac import FLAC -from mutagen.mp3 import EasyMP3 -from pathvalidate import sanitize_filename - -import qobuz_dl.spoofbuz as spoofbuz -from qobuz_dl import downloader, qopy -from qobuz_dl.color import CYAN, DF, OFF, RED, RESET, YELLOW -from qobuz_dl.db import create_db, handle_download_id -from qobuz_dl.exceptions import NonStreamable - -WEB_URL = "https://play.qobuz.com/" -ARTISTS_SELECTOR = "td.chartlist-artist > a" -TITLE_SELECTOR = "td.chartlist-name > a" -EXTENSIONS = (".mp3", ".flac") -QUALITIES = { - 5: "5 - MP3", - 6: "6 - 16 bit, 44.1kHz", - 7: "7 - 24 bit, <96kHz", - 27: "27 - 24 bit, >96kHz", -} - -logger = logging.getLogger(__name__) - - -class PartialFormatter(string.Formatter): - def __init__(self, missing="n/a", bad_fmt="n/a"): - self.missing, self.bad_fmt = missing, bad_fmt - - def get_field(self, field_name, args, kwargs): - try: - val = super(PartialFormatter, self).get_field(field_name, args, kwargs) - except (KeyError, AttributeError): - val = None, field_name - return val - - def format_field(self, value, spec): - if not value: - return self.missing - try: - return super(PartialFormatter, self).format_field(value, spec) - except ValueError: - if self.bad_fmt: - return self.bad_fmt - raise - - -class QobuzDL: - def __init__( - self, - directory="Qobuz Downloads", - quality=6, - embed_art=False, - lucky_limit=1, - lucky_type="album", - interactive_limit=20, - ignore_singles_eps=False, - no_m3u_for_playlists=False, - quality_fallback=True, - cover_og_quality=False, - no_cover=False, - downloads_db=None, - folder_format="{artist} - {album} ({year}) [{bit_depth}B-" - "{sampling_rate}kHz]", - track_format="{tracknumber}. {tracktitle}", - smart_discography=False, - ): - self.directory = self.create_dir(directory) - self.quality = quality - self.embed_art = embed_art - self.lucky_limit = lucky_limit - self.lucky_type = lucky_type - self.interactive_limit = interactive_limit - self.ignore_singles_eps = ignore_singles_eps - self.no_m3u_for_playlists = no_m3u_for_playlists - self.quality_fallback = quality_fallback - self.cover_og_quality = cover_og_quality - self.no_cover = no_cover - self.downloads_db = create_db(downloads_db) if downloads_db else None - self.folder_format = folder_format - self.track_format = track_format - self.smart_discography = smart_discography - - def initialize_client(self, email, pwd, app_id, secrets): - self.client = qopy.Client(email, pwd, app_id, secrets) - logger.info(f"{YELLOW}Set max quality: {QUALITIES[int(self.quality)]}\n") - - def get_tokens(self): - spoofer = spoofbuz.Spoofer() - self.app_id = spoofer.getAppId() - self.secrets = [ - secret for secret in spoofer.getSecrets().values() if secret - ] # avoid empty fields - - def create_dir(self, directory=None): - fix = os.path.normpath(directory) - os.makedirs(fix, exist_ok=True) - return fix - - def get_url_info(self, url: str) -> Tuple[str, str]: - """Returns the type of the url and the id. - - Compatible with urls of the form: - https://www.qobuz.com/us-en/{type}/{name}/{id} - https://open.qobuz.com/{type}/{id} - https://play.qobuz.com/{type}/{id} - /us-en/{type}/-/{id} - """ - - r = re.search( - r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})" - r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)", - url, - ) - return r.groups() - - def download_from_id(self, item_id, album=True, alt_path=None): - if handle_download_id(self.downloads_db, item_id, add_id=False): - logger.info( - f"{OFF}This release ID ({item_id}) was already downloaded " - "according to the local database.\nUse the '--no-db' flag " - "to bypass this." - ) - return - try: - downloader.download_id_by_type( - self.client, - item_id, - alt_path or self.directory, - str(self.quality), - album, - self.embed_art, - self.ignore_singles_eps, - self.quality_fallback, - self.cover_og_quality, - self.no_cover, - folder_format=self.folder_format, - track_format=self.track_format, - ) - handle_download_id(self.downloads_db, item_id, add_id=True) - except (requests.exceptions.RequestException, NonStreamable) as e: - logger.error(f"{RED}Error getting release: {e}. Skipping...") - - def handle_url(self, url): - possibles = { - "playlist": { - "func": self.client.get_plist_meta, - "iterable_key": "tracks", - }, - "artist": { - "func": self.client.get_artist_meta, - "iterable_key": "albums", - }, - "label": { - "func": self.client.get_label_meta, - "iterable_key": "albums", - }, - "album": {"album": True, "func": None, "iterable_key": None}, - "track": {"album": False, "func": None, "iterable_key": None}, - } - try: - url_type, item_id = self.get_url_info(url) - type_dict = possibles[url_type] - except (KeyError, IndexError): - logger.info( - f'{RED}Invalid url: "{url}". Use urls from ' "https://play.qobuz.com!" - ) - return - if type_dict["func"]: - content = [item for item in type_dict["func"](item_id)] - content_name = content[0]["name"] - logger.info( - f"{YELLOW}Downloading all the music from {content_name} " - f"({url_type})!" - ) - new_path = self.create_dir( - os.path.join(self.directory, sanitize_filename(content_name)) - ) - - if self.smart_discography and url_type == "artist": - # change `save_space` and `skip_extras` for customization - items = self._smart_discography_filter( - content, - save_space=True, - skip_extras=True, - ) - else: - items = [item[type_dict["iterable_key"]]["items"] for item in content][ - 0 - ] - - logger.info(f"{YELLOW}{len(items)} downloads in queue") - for item in items: - self.download_from_id( - item["id"], - True if type_dict["iterable_key"] == "albums" else False, - new_path, - ) - if url_type == "playlist": - self.make_m3u(new_path) - else: - self.download_from_id(item_id, type_dict["album"]) - - def download_list_of_urls(self, urls): - if not urls or not isinstance(urls, list): - logger.info(f"{OFF}Nothing to download") - return - for url in urls: - if "last.fm" in url: - self.download_lastfm_pl(url) - elif os.path.isfile(url): - self.download_from_txt_file(url) - else: - self.handle_url(url) - - def download_from_txt_file(self, txt_file): - with open(txt_file, "r") as txt: - try: - urls = [ - line.replace("\n", "") - for line in txt.readlines() - if not line.strip().startswith("#") - ] - except Exception as e: - logger.error(f"{RED}Invalid text file: {e}") - return - logger.info( - f"{YELLOW}qobuz-dl will download {len(urls)}" - f" urls from file: {txt_file}" - ) - self.download_list_of_urls(urls) - - def lucky_mode(self, query, download=True): - if len(query) < 3: - logger.info(f"{RED}Your search query is too short or invalid") - return - - logger.info( - f'{YELLOW}Searching {self.lucky_type}s for "{query}".\n' - f"{YELLOW}qobuz-dl will attempt to download the first " - f"{self.lucky_limit} results." - ) - results = self.search_by_type(query, self.lucky_type, self.lucky_limit, True) - - if download: - self.download_list_of_urls(results) - - return results - - def format_duration(self, duration): - return time.strftime("%H:%M:%S", time.gmtime(duration)) - - def search_by_type(self, query, item_type, limit=10, lucky=False): - if len(query) < 3: - logger.info("{RED}Your search query is too short or invalid") - return - - possibles = { - "album": { - "func": self.client.search_albums, - "album": True, - "key": "albums", - "format": "{artist[name]} - {title}", - "requires_extra": True, - }, - "artist": { - "func": self.client.search_artists, - "album": True, - "key": "artists", - "format": "{name} - ({albums_count} releases)", - "requires_extra": False, - }, - "track": { - "func": self.client.search_tracks, - "album": False, - "key": "tracks", - "format": "{performer[name]} - {title}", - "requires_extra": True, - }, - "playlist": { - "func": self.client.search_playlists, - "album": False, - "key": "playlists", - "format": "{name} - ({tracks_count} releases)", - "requires_extra": False, - }, - } - - try: - mode_dict = possibles[item_type] - results = mode_dict["func"](query, limit) - iterable = results[mode_dict["key"]]["items"] - item_list = [] - for i in iterable: - fmt = PartialFormatter() - text = fmt.format(mode_dict["format"], **i) - if mode_dict["requires_extra"]: - - text = "{} - {} [{}]".format( - text, - self.format_duration(i["duration"]), - "HI-RES" if i["hires_streamable"] else "LOSSLESS", - ) - - url = "{}{}/{}".format(WEB_URL, item_type, i.get("id", "")) - item_list.append({"text": text, "url": url} if not lucky else url) - return item_list - except (KeyError, IndexError): - logger.info(f"{RED}Invalid type: {item_type}") - return - - def interactive(self, download=True): - try: - from pick import pick - except (ImportError, ModuleNotFoundError): - if os.name == "nt": - sys.exit( - "Please install curses with " - '"pip3 install windows-curses" to continue' - ) - raise - - qualities = [ - {"q_string": "320", "q": 5}, - {"q_string": "Lossless", "q": 6}, - {"q_string": "Hi-res =< 96kHz", "q": 7}, - {"q_string": "Hi-Res > 96 kHz", "q": 27}, - ] - - def get_title_text(option): - return option.get("text") - - def get_quality_text(option): - return option.get("q_string") - - try: - item_types = ["Albums", "Tracks", "Artists", "Playlists"] - selected_type = pick(item_types, "I'll search for:\n[press Intro]")[0][ - :-1 - ].lower() - logger.info(f"{YELLOW}Ok, we'll search for " f"{selected_type}s{RESET}") - final_url_list = [] - while True: - query = input( - f"{CYAN}Enter your search: [Ctrl + c to quit]\n" f"-{DF} " - ) - logger.info(f"{YELLOW}Searching...{RESET}") - options = self.search_by_type( - query, selected_type, self.interactive_limit - ) - if not options: - logger.info(f"{OFF}Nothing found{RESET}") - continue - title = ( - f'*** RESULTS FOR "{query.title()}" ***\n\n' - "Select [space] the item(s) you want to download " - "(one or more)\nPress Ctrl + c to quit\n" - "Don't select anything to try another search" - ) - selected_items = pick( - options, - title, - multiselect=True, - min_selection_count=0, - options_map_func=get_title_text, - ) - if len(selected_items) > 0: - [final_url_list.append(i[0]["url"]) for i in selected_items] - y_n = pick( - ["Yes", "No"], - "Items were added to queue to be downloaded. " - "Keep searching?", - ) - if y_n[0][0] == "N": - break - else: - logger.info(f"{YELLOW}Ok, try again...{RESET}") - continue - if final_url_list: - desc = ( - "Select [intro] the quality (the quality will " - "be automatically\ndowngraded if the selected " - "is not found)" - ) - self.quality = pick( - qualities, - desc, - default_index=1, - options_map_func=get_quality_text, - )[0]["q"] - - if download: - self.download_list_of_urls(final_url_list) - - return final_url_list - except KeyboardInterrupt: - logger.info(f"{YELLOW}Bye") - return - - def download_lastfm_pl(self, playlist_url): - # Apparently, last fm API doesn't have a playlist endpoint. If you - # find out that it has, please fix this! - try: - r = requests.get(playlist_url, timeout=10) - except requests.exceptions.RequestException as e: - logger.error(f"{RED}Playlist download failed: {e}") - return - soup = bso(r.content, "html.parser") - artists = [artist.text for artist in soup.select(ARTISTS_SELECTOR)] - titles = [title.text for title in soup.select(TITLE_SELECTOR)] - - track_list = [] - if len(artists) == len(titles) and artists: - track_list = [ - artist + " " + title for artist, title in zip(artists, titles) - ] - - if not track_list: - logger.info(f"{OFF}Nothing found") - return - - pl_title = sanitize_filename(soup.select_one("h1").text) - pl_directory = os.path.join(self.directory, pl_title) - logger.info( - f"{YELLOW}Downloading playlist: {pl_title} " f"({len(track_list)} tracks)" - ) - - for i in track_list: - track_id = self.get_url_info( - self.search_by_type(i, "track", 1, lucky=True)[0] - )[1] - if track_id: - self.download_from_id(track_id, False, pl_directory) - - self.make_m3u(pl_directory) - - def make_m3u(self, pl_directory): - if self.no_m3u_for_playlists: - return - - track_list = ["#EXTM3U"] - rel_folder = os.path.basename(os.path.normpath(pl_directory)) - pl_name = rel_folder + ".m3u" - for local, dirs, files in os.walk(pl_directory): - dirs.sort() - audio_rel_files = [ - # os.path.abspath(os.path.join(local, file_)) - # os.path.join(rel_folder, - # os.path.basename(os.path.normpath(local)), - # file_) - os.path.join(os.path.basename(os.path.normpath(local)), file_) - for file_ in files - if os.path.splitext(file_)[-1] in EXTENSIONS - ] - audio_files = [ - os.path.abspath(os.path.join(local, file_)) - for file_ in files - if os.path.splitext(file_)[-1] in EXTENSIONS - ] - if not audio_files or len(audio_files) != len(audio_rel_files): - continue - - for audio_rel_file, audio_file in zip(audio_rel_files, audio_files): - try: - pl_item = ( - EasyMP3(audio_file) - if ".mp3" in audio_file - else FLAC(audio_file) - ) - title = pl_item["TITLE"][0] - artist = pl_item["ARTIST"][0] - length = int(pl_item.info.length) - index = "#EXTINF:{}, {} - {}\n{}".format( - length, artist, title, audio_rel_file - ) - except: # noqa - continue - track_list.append(index) - - if len(track_list) > 1: - with open(os.path.join(pl_directory, pl_name), "w") as pl: - pl.write("\n\n".join(track_list)) - - def _smart_discography_filter( - self, contents: list, save_space: bool = False, skip_extras: bool = False - ) -> list: - """When downloading some artists' discography, many random and spam-like - albums can get downloaded. This helps filter those out to just get the good stuff. - - This function removes: - * albums by other artists, which may contain a feature from the requested artist - * duplicate albums in different qualities - * (optionally) removes collector's, deluxe, live albums - - :param list contents: contents returned by qobuz API - :param bool save_space: choose highest bit depth, lowest sampling rate - :param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...) - :returns: filtered items list - """ - - # for debugging - def print_album(album: dict) -> None: - logger.debug( - f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}" - ) - - TYPE_REGEXES = { - "remaster": r"(?i)(re)?master(ed)?", - "extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)", - } - - def is_type(album_t: str, album: dict) -> bool: - """Check if album is of type `album_t`""" - version = album.get("version", "") - title = album.get("title", "") - regex = TYPE_REGEXES[album_t] - return re.search(regex, f"{title} {version}") is not None - - def essence(album: dict) -> str: - """Ignore text in parens/brackets, return all lowercase. - Used to group two albums that may be named similarly, but not exactly - the same. - """ - r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album) - return r.group(1).strip().lower() - - requested_artist = contents[0]["name"] - items = [item["albums"]["items"] for item in contents][0] - - # use dicts to group duplicate albums together by title - title_grouped = dict() - for item in items: - if (t := essence(item["title"])) not in title_grouped: - title_grouped[t] = [] - title_grouped[t].append(item) - - items = [] - for albums in title_grouped.values(): - best_bit_depth = max(a["maximum_bit_depth"] for a in albums) - get_best = min if save_space else max - best_sampling_rate = get_best( - a["maximum_sampling_rate"] - for a in albums - if a["maximum_bit_depth"] == best_bit_depth - ) - remaster_exists = any(is_type("remaster", a) for a in albums) - - def is_valid(album: dict) -> bool: - return ( - album["maximum_bit_depth"] == best_bit_depth - and album["maximum_sampling_rate"] == best_sampling_rate - and album["artist"]["name"] == requested_artist - and not ( # states that are not allowed - (remaster_exists and not is_type("remaster", album)) - or (skip_extras and is_type("extra", album)) - ) - ) - - filtered = tuple(filter(is_valid, albums)) - # most of the time, len is 0 or 1. - # if greater, it is a complete duplicate, - # so it doesn't matter which is chosen - if len(filtered) >= 1: - items.append(filtered[0]) - - return items diff --git a/qobuz_dl/db.py b/qobuz_dl/db.py deleted file mode 100644 index ccc117b..0000000 --- a/qobuz_dl/db.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging -import sqlite3 - -from qobuz_dl.color import RED, YELLOW - -logger = logging.getLogger(__name__) - - -def create_db(db_path): - with sqlite3.connect(db_path) as conn: - try: - conn.execute("CREATE TABLE downloads (id TEXT UNIQUE NOT NULL);") - logger.info(f"{YELLOW}Download-IDs database created") - except sqlite3.OperationalError: - pass - return db_path - - -def handle_download_id(db_path, item_id, add_id=False): - if not db_path: - return - - with sqlite3.connect(db_path) as conn: - # If add_if is False return a string to know if the ID is in the DB - # Otherwise just add the ID to the DB - if add_id: - try: - conn.execute( - "INSERT INTO downloads (id) VALUES (?)", - (item_id,), - ) - conn.commit() - except sqlite3.Error as e: - logger.error(f"{RED}Unexpected DB error: {e}") - else: - return conn.execute( - "SELECT id FROM downloads where id=?", - (item_id,), - ).fetchone() diff --git a/qobuz_dl/downloader.py b/qobuz_dl/downloader.py deleted file mode 100644 index 7859def..0000000 --- a/qobuz_dl/downloader.py +++ /dev/null @@ -1,400 +0,0 @@ -import logging -import os -from typing import Tuple - -import requests -from pathvalidate import sanitize_filename -from tqdm import tqdm - -import qobuz_dl.metadata as metadata -from qobuz_dl.color import CYAN, GREEN, OFF, RED, YELLOW -from qobuz_dl.exceptions import NonStreamable - -QL_DOWNGRADE = "FormatRestrictedByFormatAvailability" -# used in case of error -DEFAULT_FORMATS = { - "MP3": [ - "{artist} - {album} ({year}) [MP3]", - "{tracknumber}. {tracktitle}", - ], - "Unknown": [ - "{artist} - {album}", - "{tracknumber}. {tracktitle}", - ], -} - -logger = logging.getLogger(__name__) - - -def tqdm_download(url, fname, track_name): - r = requests.get(url, allow_redirects=True, stream=True) - total = int(r.headers.get("content-length", 0)) - with open(fname, "wb") as file, tqdm( - total=total, - unit="iB", - unit_scale=True, - unit_divisor=1024, - desc=track_name, - bar_format=CYAN + "{n_fmt}/{total_fmt} /// {desc}", - ) as bar: - for data in r.iter_content(chunk_size=1024): - size = file.write(data) - bar.update(size) - - -def get_description(u: dict, track_title, multiple=None): - downloading_title = f"{track_title} " - f'[{u["bit_depth"]}/{u["sampling_rate"]}]' - if multiple: - downloading_title = f"[Disc {multiple}] {downloading_title}" - return downloading_title - - -def get_format( - client, item_dict, quality, is_track_id=False, track_url_dict=None -) -> Tuple[str, bool, int, int]: - quality_met = True - if int(quality) == 5: - return ("MP3", quality_met, None, None) - track_dict = item_dict - if not is_track_id: - track_dict = item_dict["tracks"]["items"][0] - - try: - new_track_dict = ( - client.get_track_url(track_dict["id"], quality) - if not track_url_dict - else track_url_dict - ) - restrictions = new_track_dict.get("restrictions") - if isinstance(restrictions, list): - if any( - restriction.get("code") == QL_DOWNGRADE for restriction in restrictions - ): - quality_met = False - - return ( - "FLAC", - quality_met, - new_track_dict["bit_depth"], - new_track_dict["sampling_rate"], - ) - except (KeyError, requests.exceptions.HTTPError): - return ("Unknown", quality_met, None, None) - - -def get_title(item_dict): - album_title = item_dict["title"] - version = item_dict.get("version") - if version: - album_title = ( - f"{album_title} ({version})" - if version.lower() not in album_title.lower() - else album_title - ) - return album_title - - -def get_extra(i, dirn, extra="cover.jpg", og_quality=False): - extra_file = os.path.join(dirn, extra) - if os.path.isfile(extra_file): - logger.info(f"{OFF}{extra} was already downloaded") - return - tqdm_download( - i.replace("_600.", "_org.") if og_quality else i, - extra_file, - extra, - ) - - -# Download and tag a file -def download_and_tag( - root_dir, - tmp_count, - track_url_dict, - track_metadata, - album_or_track_metadata, - is_track, - is_mp3, - embed_art=False, - multiple=None, - track_format="{tracknumber}. {tracktitle}", -): - """ - Download and tag a file - - :param str root_dir: Root directory where the track will be stored - :param int tmp_count: Temporal download file number - :param dict track_url_dict: get_track_url dictionary from Qobuz client - :param dict track_metadata: Track item dictionary from Qobuz client - :param dict album_or_track_metadata: Album/track dict from Qobuz client - :param bool is_track - :param bool is_mp3 - :param bool embed_art: Embed cover art into file (FLAC-only) - :param str track_format format-string that determines file naming - :param multiple: Multiple disc integer - :type multiple: integer or None - """ - - extension = ".mp3" if is_mp3 else ".flac" - - try: - url = track_url_dict["url"] - except KeyError: - logger.info(f"{OFF}Track not available for download") - return - - if multiple: - root_dir = os.path.join(root_dir, f"Disc {multiple}") - os.makedirs(root_dir, exist_ok=True) - - filename = os.path.join(root_dir, f".{tmp_count:02}.tmp") - - # Determine the filename - track_title = track_metadata.get("title") - artist = _safe_get(track_metadata, "performer", "name") - filename_attr = { - "artist": artist, - "albumartist": _safe_get( - track_metadata, "album", "artist", "name", default=artist - ), - "bit_depth": track_metadata["maximum_bit_depth"], - "sampling_rate": track_metadata["maximum_sampling_rate"], - "tracktitle": track_title, - "version": track_metadata.get("version"), - "tracknumber": f"{track_metadata['track_number']:02}", - } - # track_format is a format string - # e.g. '{tracknumber}. {artist} - {tracktitle}' - formatted_path = sanitize_filename(track_format.format(**filename_attr)) - final_file = os.path.join(root_dir, formatted_path)[:250] + extension - - if os.path.isfile(final_file): - logger.info(f"{OFF}{track_title} was already downloaded") - return - - desc = get_description(track_url_dict, track_title, multiple) - tqdm_download(url, filename, desc) - tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac - try: - tag_function( - filename, - root_dir, - final_file, - track_metadata, - album_or_track_metadata, - is_track, - embed_art, - ) - except Exception as e: - logger.error(f"{RED}Error tagging the file: {e}", exc_info=True) - - -def download_id_by_type( - client, - item_id, - path, - quality, - album=False, - embed_art=False, - albums_only=False, - downgrade_quality=True, - cover_og_quality=False, - no_cover=False, - folder_format="{artist} - {album} ({year}) " "[{bit_depth}B-{sampling_rate}kHz]", - track_format="{tracknumber}. {tracktitle}", -): - """ - Download and get metadata by ID and type (album or track) - - :param Qopy client: qopy Client - :param int item_id: Qobuz item id - :param str path: The root directory where the item will be downloaded - :param int quality: Audio quality (5, 6, 7, 27) - :param bool album: album type or not - :param embed_art album: Embed cover art into files - :param bool albums_only: Ignore Singles, EPs and VA releases - :param bool downgrade: Skip releases not available in set quality - :param bool cover_og_quality: Download cover in its original quality - :param bool no_cover: Don't download cover art - :param str folder_format: format string that determines folder naming - :param str track_format: format string that determines track naming - """ - count = 0 - - if album: - meta = client.get_album_meta(item_id) - - if not meta.get("streamable"): - raise NonStreamable("This release is not streamable") - - if albums_only and ( - meta.get("release_type") != "album" - or meta.get("artist").get("name") == "Various Artists" - ): - logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "")}') - return - - album_title = get_title(meta) - - format_info = get_format(client, meta, quality) - file_format, quality_met, bit_depth, sampling_rate = format_info - - if not downgrade_quality and not quality_met: - logger.info( - f"{OFF}Skipping {album_title} as it doesn't " "meet quality requirement" - ) - return - - logger.info( - f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format} ({bit_depth}/{sampling_rate})\n" - ) - album_attr = { - "artist": meta["artist"]["name"], - "album": album_title, - "year": meta["release_date_original"].split("-")[0], - "format": file_format, - "bit_depth": bit_depth, - "sampling_rate": sampling_rate, - } - folder_format, track_format = _clean_format_str( - folder_format, track_format, file_format - ) - sanitized_title = sanitize_filename(folder_format.format(**album_attr)) - dirn = os.path.join(path, sanitized_title) - os.makedirs(dirn, exist_ok=True) - - if no_cover: - logger.info(f"{OFF}Skipping cover") - else: - get_extra(meta["image"]["large"], dirn, og_quality=cover_og_quality) - - if "goodies" in meta: - try: - get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf") - except: # noqa - pass - media_numbers = [track["media_number"] for track in meta["tracks"]["items"]] - is_multiple = True if len([*{*media_numbers}]) > 1 else False - for i in meta["tracks"]["items"]: - parse = client.get_track_url(i["id"], quality) - if "sample" not in parse and parse["sampling_rate"]: - is_mp3 = True if int(quality) == 5 else False - download_and_tag( - dirn, - count, - parse, - i, - meta, - False, - is_mp3, - embed_art, - i["media_number"] if is_multiple else None, - track_format=track_format, - ) - else: - logger.info(f"{OFF}Demo. Skipping") - count = count + 1 - else: - parse = client.get_track_url(item_id, quality) - - if "sample" not in parse and parse["sampling_rate"]: - meta = client.get_track_meta(item_id) - track_title = get_title(meta) - logger.info(f"\n{YELLOW}Downloading: {track_title}") - format_info = get_format( - client, meta, quality, is_track_id=True, track_url_dict=parse - ) - file_format, quality_met, bit_depth, sampling_rate = format_info - - folder_format, track_format = _clean_format_str( - folder_format, track_format, bit_depth - ) - - if not downgrade_quality and not quality_met: - logger.info( - f"{OFF}Skipping {track_title} as it doesn't " - "meet quality requirement" - ) - return - track_attr = { - "artist": meta["album"]["artist"]["name"], - "tracktitle": track_title, - "year": meta["album"]["release_date_original"].split("-")[0], - "bit_depth": bit_depth, - "sampling_rate": sampling_rate, - } - sanitized_title = sanitize_filename(folder_format.format(**track_attr)) - - dirn = os.path.join(path, sanitized_title) - os.makedirs(dirn, exist_ok=True) - if no_cover: - logger.info(f"{OFF}Skipping cover") - else: - get_extra( - meta["album"]["image"]["large"], dirn, og_quality=cover_og_quality - ) - is_mp3 = True if int(quality) == 5 else False - download_and_tag( - dirn, - count, - parse, - meta, - meta, - True, - is_mp3, - embed_art, - track_format=track_format, - ) - else: - logger.info(f"{OFF}Demo. Skipping") - logger.info(f"{GREEN}Completed") - - -# ----------- Utilities ----------- - - -def _clean_format_str(folder: str, track: str, file_format: str) -> Tuple[str, str]: - """Cleans up the format strings, avoids errors - with MP3 files. - """ - final = [] - for i, fs in enumerate((folder, track)): - if fs.endswith(".mp3"): - fs = fs[:-4] - elif fs.endswith(".flac"): - fs = fs[:-5] - fs = fs.strip() - - # default to pre-chosen string if format is invalid - if file_format in ("MP3", "Unknown") and ( - "bit_depth" in fs or "sampling_rate" in fs - ): - default = DEFAULT_FORMATS[file_format][i] - logger.error( - f"{RED}invalid format string for format {file_format}" - f". defaulting to {default}" - ) - fs = default - final.append(fs) - - return tuple(final) - - -def _safe_get(d: dict, *keys, default=None): - """A replacement for chained `get()` statements on dicts: - >>> d = {'foo': {'bar': 'baz'}} - >>> _safe_get(d, 'baz') - None - >>> _safe_get(d, 'foo', 'bar') - 'baz' - """ - curr = d - res = default - for key in keys: - res = curr.get(key, default) - if res == default or not hasattr(res, "__getitem__"): - return res - else: - curr = res - return res diff --git a/qobuz_dl/exceptions.py b/qobuz_dl/exceptions.py deleted file mode 100644 index 9461cda..0000000 --- a/qobuz_dl/exceptions.py +++ /dev/null @@ -1,22 +0,0 @@ -class AuthenticationError(Exception): - pass - - -class IneligibleError(Exception): - pass - - -class InvalidAppIdError(Exception): - pass - - -class InvalidAppSecretError(Exception): - pass - - -class InvalidQuality(Exception): - pass - - -class NonStreamable(Exception): - pass diff --git a/qobuz_dl/metadata.py b/qobuz_dl/metadata.py deleted file mode 100644 index c1b87ef..0000000 --- a/qobuz_dl/metadata.py +++ /dev/null @@ -1,224 +0,0 @@ -import logging -import os -import re - -import mutagen.id3 as id3 -from mutagen.flac import FLAC, Picture -from mutagen.id3 import ID3NoHeaderError - -logger = logging.getLogger(__name__) - - -# unicode symbols -COPYRIGHT, PHON_COPYRIGHT = "\u2117", "\u00a9" -# if a metadata block exceeds this, mutagen will raise error -# and the file won't be tagged -FLAC_MAX_BLOCKSIZE = 16777215 - - -def get_title(track_dict): - title = track_dict["title"] - version = track_dict.get("version") - if version: - title = f"{title} ({version})" - # for classical works - if track_dict.get("work"): - title = "{}: {}".format(track_dict["work"], title) - - return title - - -def _format_copyright(s: str) -> str: - s = s.replace("(P)", PHON_COPYRIGHT) - s = s.replace("(C)", COPYRIGHT) - return s - - -def _format_genres(genres: list) -> str: - """Fixes the weirdly formatted genre lists returned by the API. - >>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé'] - >>> _format_genres(g) - 'Pop, Rock, Alternatif et Indé' - """ - genres = re.findall(r"([^\u2192\/]+)", "/".join(genres)) - no_repeats = [] - [no_repeats.append(g) for g in genres if g not in no_repeats] - return ", ".join(no_repeats) - - -# Use KeyError catching instead of dict.get to avoid empty tags -def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=False): - """ - Tag a FLAC file - - :param str filename: FLAC file path - :param str root_dir: Root dir used to get the cover art - :param str final_name: Final name of the FLAC file (complete path) - :param dict d: Track dictionary from Qobuz_client - :param dict album: Album dictionary from Qobuz_client - :param bool istrack - :param bool em_image: Embed cover art into file - """ - audio = FLAC(filename) - - audio["TITLE"] = get_title(d) - - audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER - - if "Disc " in final_name: - audio["DISCNUMBER"] = str(d["media_number"]) - - try: - audio["COMPOSER"] = d["composer"]["name"] # COMPOSER - except KeyError: - pass - - try: - audio["ARTIST"] = d["performer"]["name"] # TRACK ARTIST - except KeyError: - if istrack: - audio["ARTIST"] = d["album"]["artist"]["name"] # TRACK ARTIST - else: - audio["ARTIST"] = album["artist"]["name"] - - try: - audio["LABEL"] = album["label"]["name"] - except KeyError: - pass - - if istrack: - audio["GENRE"] = _format_genres(d["album"]["genres_list"]) - audio["ALBUMARTIST"] = d["album"]["artist"]["name"] - audio["TRACKTOTAL"] = str(d["album"]["tracks_count"]) - audio["ALBUM"] = d["album"]["title"] - audio["DATE"] = d["album"]["release_date_original"] - audio["COPYRIGHT"] = _format_copyright(d["copyright"]) - else: - audio["GENRE"] = _format_genres(album["genres_list"]) - audio["ALBUMARTIST"] = album["artist"]["name"] - audio["TRACKTOTAL"] = str(album["tracks_count"]) - audio["ALBUM"] = album["title"] - audio["DATE"] = album["release_date_original"] - audio["COPYRIGHT"] = _format_copyright(album["copyright"]) - - if em_image: - emb_image = os.path.join(root_dir, "cover.jpg") - multi_emb_image = os.path.join( - os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" - ) - if os.path.isfile(emb_image): - cover_image = emb_image - else: - cover_image = multi_emb_image - - try: - # rest of the metadata still gets embedded - # when the image size is too big - if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE: - raise Exception( - "downloaded cover size too large to embed. " - "turn off `og_cover` to avoid error" - ) - - image = Picture() - image.type = 3 - image.mime = "image/jpeg" - image.desc = "cover" - with open(cover_image, "rb") as img: - image.data = img.read() - audio.add_picture(image) - except Exception as e: - logger.error(f"Error embedding image: {e}", exc_info=True) - - audio.save() - os.rename(filename, final_name) - - -def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=False): - """ - Tag an mp3 file - - :param str filename: mp3 temporary file path - :param str root_dir: Root dir used to get the cover art - :param str final_name: Final name of the mp3 file (complete path) - :param dict d: Track dictionary from Qobuz_client - :param bool istrack - :param bool em_image: Embed cover art into file - """ - - id3_legend = { - "album": id3.TALB, - "albumartist": id3.TPE2, - "artist": id3.TPE1, - "comment": id3.COMM, - "composer": id3.TCOM, - "copyright": id3.TCOP, - "date": id3.TDAT, - "genre": id3.TCON, - "isrc": id3.TSRC, - "label": id3.TPUB, - "performer": id3.TOPE, - "title": id3.TIT2, - "year": id3.TYER, - } - try: - audio = id3.ID3(filename) - except ID3NoHeaderError: - audio = id3.ID3() - - # temporarily holds metadata - tags = dict() - tags["title"] = get_title(d) - try: - tags["label"] = album["label"]["name"] - except KeyError: - pass - - try: - tags["artist"] = d["performer"]["name"] - except KeyError: - if istrack: - tags["artist"] = d["album"]["artist"]["name"] - else: - tags["artist"] = album["artist"]["name"] - - if istrack: - tags["genre"] = _format_genres(d["album"]["genres_list"]) - tags["albumartist"] = d["album"]["artist"]["name"] - tags["album"] = d["album"]["title"] - tags["date"] = d["album"]["release_date_original"] - tags["copyright"] = _format_copyright(d["copyright"]) - tracktotal = str(d["album"]["tracks_count"]) - else: - tags["genre"] = _format_genres(album["genres_list"]) - tags["albumartist"] = album["artist"]["name"] - tags["album"] = album["title"] - tags["date"] = album["release_date_original"] - tags["copyright"] = _format_copyright(album["copyright"]) - tracktotal = str(album["tracks_count"]) - - tags["year"] = tags["date"][:4] - - audio["TRCK"] = id3.TRCK(encoding=3, text=f'{d["track_number"]}/{tracktotal}') - audio["TPOS"] = id3.TPOS(encoding=3, text=str(d["media_number"])) - - # write metadata in `tags` to file - for k, v in tags.items(): - id3tag = id3_legend[k] - audio[id3tag.__name__] = id3tag(encoding=3, text=v) - - if em_image: - emb_image = os.path.join(root_dir, "cover.jpg") - multi_emb_image = os.path.join( - os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" - ) - if os.path.isfile(emb_image): - cover_image = emb_image - else: - cover_image = multi_emb_image - - with open(cover_image, "rb") as cover: - audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read())) - - audio.save(filename, "v2_version=3") - os.rename(filename, final_name) diff --git a/qobuz_dl/qopy.py b/qobuz_dl/qopy.py deleted file mode 100644 index ff2cb87..0000000 --- a/qobuz_dl/qopy.py +++ /dev/null @@ -1,204 +0,0 @@ -# Wrapper for Qo-DL Reborn. This is a sligthly modified version -# of qopy, originally written by Sorrow446. All credits to the -# original author. - -import hashlib -import logging -import time - -import requests - -from qobuz_dl.color import GREEN, YELLOW -from qobuz_dl.exceptions import ( - AuthenticationError, - IneligibleError, - InvalidAppIdError, - InvalidAppSecretError, - InvalidQuality, -) - -RESET = "Reset your credentials with 'qobuz-dl -r'" - -logger = logging.getLogger(__name__) - - -class Client: - def __init__(self, email, pwd, app_id, secrets): - logger.info(f"{YELLOW}Logging...") - self.secrets = secrets - self.id = app_id - self.session = requests.Session() - self.session.headers.update( - { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0", - "X-App-Id": self.id, - } - ) - self.base = "https://www.qobuz.com/api.json/0.2/" - self.auth(email, pwd) - self.cfg_setup() - - def api_call(self, epoint, **kwargs): - if epoint == "user/login": - params = { - "email": kwargs["email"], - "password": kwargs["pwd"], - "app_id": self.id, - } - elif epoint == "track/get": - params = {"track_id": kwargs["id"]} - elif epoint == "album/get": - params = {"album_id": kwargs["id"]} - elif epoint == "playlist/get": - params = { - "extra": "tracks", - "playlist_id": kwargs["id"], - "limit": 500, - "offset": kwargs["offset"], - } - elif epoint == "artist/get": - params = { - "app_id": self.id, - "artist_id": kwargs["id"], - "limit": 500, - "offset": kwargs["offset"], - "extra": "albums", - } - elif epoint == "label/get": - params = { - "label_id": kwargs["id"], - "limit": 500, - "offset": kwargs["offset"], - "extra": "albums", - } - elif epoint == "userLibrary/getAlbumsList": - unix = time.time() - r_sig = "userLibrarygetAlbumsList" + str(unix) + kwargs["sec"] - r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest() - params = { - "app_id": self.id, - "user_auth_token": self.uat, - "request_ts": unix, - "request_sig": r_sig_hashed, - } - elif epoint == "track/getFileUrl": - unix = time.time() - track_id = kwargs["id"] - fmt_id = kwargs["fmt_id"] - if int(fmt_id) not in (5, 6, 7, 27): - raise InvalidQuality("Invalid quality id: choose between 5, 6, 7 or 27") - r_sig = "trackgetFileUrlformat_id{}intentstreamtrack_id{}{}{}".format( - fmt_id, track_id, unix, self.sec - ) - r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest() - params = { - "request_ts": unix, - "request_sig": r_sig_hashed, - "track_id": track_id, - "format_id": fmt_id, - "intent": "stream", - } - else: - params = kwargs - r = self.session.get(self.base + epoint, params=params) - if epoint == "user/login": - if r.status_code == 401: - raise AuthenticationError("Invalid credentials.\n" + RESET) - elif r.status_code == 400: - raise InvalidAppIdError("Invalid app id.\n" + RESET) - else: - logger.info(f"{GREEN}Logged: OK") - elif epoint in ["track/getFileUrl", "userLibrary/getAlbumsList"]: - if r.status_code == 400: - raise InvalidAppSecretError("Invalid app secret.\n" + RESET) - r.raise_for_status() - return r.json() - - def auth(self, email, pwd): - usr_info = self.api_call("user/login", email=email, pwd=pwd) - if not usr_info["user"]["credential"]["parameters"]: - raise IneligibleError("Free accounts are not eligible to download tracks.") - self.uat = usr_info["user_auth_token"] - self.session.headers.update({"X-User-Auth-Token": self.uat}) - self.label = usr_info["user"]["credential"]["parameters"]["short_label"] - logger.info(f"{GREEN}Membership: {self.label}") - - def multi_meta(self, epoint, key, id, type): - total = 1 - offset = 0 - while total > 0: - if type in ["tracks", "albums"]: - j = self.api_call(epoint, id=id, offset=offset, type=type)[type] - else: - j = self.api_call(epoint, id=id, offset=offset, type=type) - if offset == 0: - yield j - total = j[key] - 500 - else: - yield j - total -= 500 - offset += 500 - - def get_album_meta(self, id): - return self.api_call("album/get", id=id) - - def get_track_meta(self, id): - return self.api_call("track/get", id=id) - - def get_track_url(self, id, fmt_id): - return self.api_call("track/getFileUrl", id=id, fmt_id=fmt_id) - - def get_artist_meta(self, id): - return self.multi_meta("artist/get", "albums_count", id, None) - - def get_plist_meta(self, id): - return self.multi_meta("playlist/get", "tracks_count", id, None) - - def get_label_meta(self, id): - return self.multi_meta("label/get", "albums_count", id, None) - - def search_albums(self, query, limit): - return self.api_call("album/search", query=query, limit=limit) - - def search_artists(self, query, limit): - return self.api_call("artist/search", query=query, limit=limit) - - def search_playlists(self, query, limit): - return self.api_call("playlist/search", query=query, limit=limit) - - def search_tracks(self, query, limit): - return self.api_call("track/search", query=query, limit=limit) - - def get_favorite_albums(self, offset, limit): - return self.api_call( - "favorite/getUserFavorites", type="albums", offset=offset, limit=limit - ) - - def get_favorite_tracks(self, offset, limit): - return self.api_call( - "favorite/getUserFavorites", type="tracks", offset=offset, limit=limit - ) - - def get_favorite_artists(self, offset, limit): - return self.api_call( - "favorite/getUserFavorites", type="artists", offset=offset, limit=limit - ) - - def get_user_playlists(self, limit): - return self.api_call("playlist/getUserPlaylists", limit=limit) - - def test_secret(self, sec): - try: - r = self.api_call("userLibrary/getAlbumsList", sec=sec) - return True - except InvalidAppSecretError: - return False - - def cfg_setup(self): - logging.debug(self.secrets) - for secret in self.secrets: - if self.test_secret(secret): - self.sec = secret - break - if not hasattr(self, "sec"): - raise InvalidAppSecretError("Invalid app secret.\n" + RESET) diff --git a/qobuz_dl/spoofbuz.py b/qobuz_dl/spoofbuz.py deleted file mode 100644 index 624d9cf..0000000 --- a/qobuz_dl/spoofbuz.py +++ /dev/null @@ -1,51 +0,0 @@ -import base64 -import re -from collections import OrderedDict - -import requests - - -class Spoofer: - def __init__(self): - self.seed_timezone_regex = r'[a-z]\.initialSeed\("(?P[\w=]+)",window\.utimezone\.(?P[a-z]+)\)' - # note: {timezones} should be replaced with every capitalized timezone joined by a | - self.info_extras_regex = r'name:"\w+/(?P{timezones})",info:"(?P[\w=]+)",extras:"(?P[\w=]+)"' - self.appId_regex = r'{app_id:"(?P\d{9})",app_secret:"\w{32}",base_port:"80",base_url:"https://www\.qobuz\.com",base_method:"/api\.json/0\.2/"},n\.base_url="https://play\.qobuz\.com"' - login_page_request = requests.get("https://play.qobuz.com/login") - login_page = login_page_request.text - bundle_url_match = re.search( - r'', - login_page, - ) - bundle_url = bundle_url_match.group(1) - bundle_req = requests.get("https://play.qobuz.com" + bundle_url) - self.bundle = bundle_req.text - - def getAppId(self): - return re.search(self.appId_regex, self.bundle).group("app_id") - - def getSecrets(self): - seed_matches = re.finditer(self.seed_timezone_regex, self.bundle) - secrets = OrderedDict() - for match in seed_matches: - seed, timezone = match.group("seed", "timezone") - secrets[timezone] = [seed] - """The code that follows switches around the first and second timezone. Why? Read on: - Qobuz uses two ternary (a shortened if statement) conditions that should always return false. - The way Javascript's ternary syntax works, the second option listed is what runs if the condition returns false. - Because of this, we must prioritize the *second* seed/timezone pair captured, not the first. - """ - keypairs = list(secrets.items()) - secrets.move_to_end(keypairs[1][0], last=False) - info_extras_regex = self.info_extras_regex.format( - timezones="|".join([timezone.capitalize() for timezone in secrets]) - ) - info_extras_matches = re.finditer(info_extras_regex, self.bundle) - for match in info_extras_matches: - timezone, info, extras = match.group("timezone", "info", "extras") - secrets[timezone.lower()] += [info, extras] - for secret_pair in secrets: - secrets[secret_pair] = base64.standard_b64decode( - "".join(secrets[secret_pair])[:-44] - ).decode("utf-8") - return secrets diff --git a/setup.py b/setup.py index d7fccf4..5ebc9f7 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import find_packages, setup -pkg_name = "qobuz-dl" +pkg_name = "music-dl" def read_file(fname): @@ -11,29 +11,34 @@ def read_file(fname): requirements = read_file("requirements.txt").strip().split() +# https://github.com/pypa/sampleproject/blob/main/setup.py setup( name=pkg_name, version="0.9.7", - author="Vitiko", - author_email="vhnz98@gmail.com", - description="The complete Lossless and Hi-Res music downloader for Qobuz", - long_description=read_file("README.md"), - long_description_content_type="text/markdown", url="https://github.com/vitiko98/Qobuz-DL", install_requires=requirements, entry_points={ "console_scripts": [ - "qobuz-dl = qobuz_dl:main", - "qdl = qobuz_dl:main", + "music-dl = music_dl:main", + "rip = music_dl:main", ], }, packages=find_packages(), classifiers=[ - "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", "License :: OSI Approved :: GNU General Public License (GPL)", "Operating System :: OS Independent", + "Development Status :: 3 - Alpha", ], - python_requires=">=3.6", + package_dir={'', 'music-dl'}, + packages=find_packages(where='music-dl'), + python_requires=">=3.9", + project_urls={ + 'Bug Reports': 'https://github.com/nathom/music-dl/issues', + 'Source': 'https://github.com/nathom/music-dl', + } ) # rm -f dist/*