diff --git a/.mypy.ini b/.mypy.ini index 195a7cc..9f87719 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -21,3 +21,6 @@ ignore_missing_imports = True [mypy-tomlkit.*] ignore_missing_imports = True + +[mypy-PIL.*] +ignore_missing_imports = True diff --git a/README.md b/README.md index 91927bd..01b968a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # streamrip [![Downloads](https://static.pepy.tech/personalized-badge/streamrip?period=total&units=international_system&left_color=black&right_color=green&left_text=Downloads)](https://pepy.tech/project/streamrip) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/python/black) A scriptable stream downloader for Qobuz, Tidal, Deezer and SoundCloud. diff --git a/demo/deezer_downloader_tutorial.png b/demo/deezer_downloader_tutorial.png new file mode 100644 index 0000000..0cc88ed Binary files /dev/null and b/demo/deezer_downloader_tutorial.png differ diff --git a/poetry.lock b/poetry.lock index 025f09c..9d0d23b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -71,6 +71,14 @@ python-versions = ">=3.5" [package.dependencies] windows-curses = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "pillow" +version = "8.3.0" +description = "Python Imaging Library (Fork)" +category = "main" +optional = false +python-versions = ">=3.6" + [[package]] name = "requests" version = "2.25.1" @@ -120,7 +128,7 @@ telegram = ["requests"] [[package]] name = "urllib3" -version = "1.26.5" +version = "1.26.6" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false @@ -142,7 +150,7 @@ python-versions = "*" [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "a32e56def426809c20924e370d316663d2369853defcd590ee0c78aaef237ad3" +content-hash = "fe85433e74f85333932d145aa003b13164c786000da6083cb761f0823ebe2729" [metadata.files] certifi = [ @@ -177,6 +185,42 @@ pick = [ {file = "pick-1.0.0-py2.py3-none-any.whl", hash = "sha256:f32c8bd0fd943490c29e461a8168f4ac267247aaa6a7fc9dd327f97832842b5f"}, {file = "pick-1.0.0.tar.gz", hash = "sha256:03f13d4f5bfe74db4b969fb74c0ef110ec443978419d6c0f1f375a0d49539034"}, ] +pillow = [ + {file = "Pillow-8.3.0-cp36-cp36m-macosx_10_10_x86_64.whl", hash = "sha256:333313bcc53a8a7359e98d5458dfe37bfa301da2fd0e0dc41f585ae0cede9181"}, + {file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bccd0d604d814e9494f3bf3f077a23835580ed1743c5175581882e7dd1f178c3"}, + {file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a7beda44f177ee602aa27e0a297da1657d9572679522c8fb8b336b734653516e"}, + {file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:94db5ea640330de0945b41dc77fb4847b4ab6e87149126c71b36b112e8400898"}, + {file = "Pillow-8.3.0-cp36-cp36m-win32.whl", hash = "sha256:856fcbc3201a6cabf0478daa0c0a1a8a175af7e5173e2084ddb91cc707a09dd1"}, + {file = "Pillow-8.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:34ce3d993cb4ca840b1e31165b38cb19c64f64f822a8bc5565bde084baff3bdb"}, + {file = "Pillow-8.3.0-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:778a819c2d194e08d39d67ddb15ef0d32eba17bf7d0c2773e97bd221b2613a3e"}, + {file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b42ea77f4e7374a67e1f27aaa9c62627dff681f67890e5b8f0c1e21b1500d9d2"}, + {file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:53f6e4b73b3899015ac4aa95d99da0f48ea18a6d7c8db672e8bead3fb9570ef5"}, + {file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:fb91deb5121b6dde88599bcb3db3fdad9cf33ff3d4ccc5329ee1fe9655a2f7ff"}, + {file = "Pillow-8.3.0-cp37-cp37m-win32.whl", hash = "sha256:8f65d2a98f198e904dbe89ecb10862d5f0511367d823689039e17c4d011de11e"}, + {file = "Pillow-8.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:25f6564df21d15bcac142b4ed92b6c02e53557539f535f31c1f3bcc985484753"}, + {file = "Pillow-8.3.0-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:c2d78c8230bda5fc9f6b1d457c7f8f3432f4fe85bed86f80ba3ed73d59775a88"}, + {file = "Pillow-8.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:950e873ceefbd283cbe7bc5b648b832d1dcf89eeded6726ebec42bc7d67966c0"}, + {file = "Pillow-8.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1037288a22cc8ec9d2918a24ded733a1cc4342fd7f21d15d37e6bbe5fb4a7306"}, + {file = "Pillow-8.3.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:063d17a02a0170c2f880fbd373b2738b089c6adcbd1f7418667bc9e97524c11b"}, + {file = "Pillow-8.3.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:561339ed7c324bbcb29b5e4f4705c97df950785394b3ac181f5bf6a08088a672"}, + {file = "Pillow-8.3.0-cp38-cp38-win32.whl", hash = "sha256:331f8321418682386e4f0d0e6369f732053f95abddd2af4e1b1ef74a9537ef37"}, + {file = "Pillow-8.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:eccaefbd646022b5313ca4b0c5f1ae6e0d3a52ef66de64970ecf3f9b2a1be751"}, + {file = "Pillow-8.3.0-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:6f7517a220aca8b822e25b08b0df9546701a606a328da5bc057e5f32a3f9b07c"}, + {file = "Pillow-8.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:cc8e926d6ffa65d0dddb871b7afe117f17bc045951e66afde60eb0eba923db9e"}, + {file = "Pillow-8.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:519b3b24dedc81876d893475bade1b92c4ce7c24b9b82224f0bd8daae682e039"}, + {file = "Pillow-8.3.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:72858a27dd7bd1c40f91c4f85db3b9f121c8412fd66573121febb00d074d0530"}, + {file = "Pillow-8.3.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:3251557c53c1ed0c345559afc02d2b0a0aa5788042e161366ed90b27bc322d3d"}, + {file = "Pillow-8.3.0-cp39-cp39-win32.whl", hash = "sha256:ce90aad0a3dc0f13a9ff0ab1f43bcbea436089b83c3fadbe37c6f1733b938bf1"}, + {file = "Pillow-8.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:490c9236ef4762733b6c2e1f1fcb37793cb9c57d860aa84d6994c990461882e5"}, + {file = "Pillow-8.3.0-pp36-pypy36_pp73-macosx_10_10_x86_64.whl", hash = "sha256:aef0838f28328523e9e5f2c1852dd96fb85768deb0eb8f908c54dad0f44d2f6f"}, + {file = "Pillow-8.3.0-pp36-pypy36_pp73-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:713b762892efa8cd5d8dac24d16ac2d2dbf981963ed1b3297e79755f03f8cbb8"}, + {file = "Pillow-8.3.0-pp36-pypy36_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:cec702974f162026bf8de47f6f4b7ce9584a63c50002b38f195ee797165fea77"}, + {file = "Pillow-8.3.0-pp37-pypy37_pp73-macosx_10_10_x86_64.whl", hash = "sha256:d9ef8119ce44f90d2f8ac7c58f7da480ada5151f217dc8da03681b73fc91dec3"}, + {file = "Pillow-8.3.0-pp37-pypy37_pp73-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:fc25d59ecf23ea19571065306806a29c43c67f830f0e8a121303916ba257f484"}, + {file = "Pillow-8.3.0-pp37-pypy37_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:28f184c0a65be098af412f78b0b6f3bbafd1614e1dc896e810d8357342a794b7"}, + {file = "Pillow-8.3.0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:c3529fb98a40f89269175442c5ff4ef81d22e91b2bdcbd33833a350709b5130c"}, + {file = "Pillow-8.3.0.tar.gz", hash = "sha256:803606e206f3e366eea46b1e7ab4dac74cfac770d04de9c35319814e11e47c46"}, +] requests = [ {file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"}, {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"}, @@ -194,8 +238,8 @@ tqdm = [ {file = "tqdm-4.61.1.tar.gz", hash = "sha256:24be966933e942be5f074c29755a95b315c69a91f839a29139bf26ffffe2d3fd"}, ] urllib3 = [ - {file = "urllib3-1.26.5-py2.py3-none-any.whl", hash = "sha256:753a0374df26658f99d826cfe40394a686d05985786d946fbe4165b5148f5a7c"}, - {file = "urllib3-1.26.5.tar.gz", hash = "sha256:a7acd0977125325f516bda9735fa7142b909a8d01e8b2e4c8108d0984e6e0098"}, + {file = "urllib3-1.26.6-py2.py3-none-any.whl", hash = "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4"}, + {file = "urllib3-1.26.6.tar.gz", hash = "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f"}, ] windows-curses = [ {file = "windows_curses-2.2.0-cp36-cp36m-win32.whl", hash = "sha256:1452d771ec6f9b3fef037da2b169196a9a12be4e86a6c27dd579adac70c42028"}, diff --git a/pyproject.toml b/pyproject.toml index 22991e9..c301279 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "streamrip" -version = "0.6.7" +version = "0.7" description = "A fast, all-in-one music ripper for Qobuz, Deezer, Tidal, and SoundCloud" authors = ["nathom "] license = "GPL-3.0-only" @@ -8,6 +8,10 @@ readme = "README.md" homepage = "https://github.com/nathom/streamrip" repository = "https://github.com/nathom/streamrip" include = ["streamrip/config.toml"] +packages = [ + { include = "streamrip" }, + { include = "rip" }, +] keywords = ["hi-res", "free", "music", "download", "tqdm"] classifiers = [ "License :: OSI Approved :: GNU General Public License (GPL)", @@ -15,7 +19,7 @@ classifiers = [ ] [tool.poetry.scripts] -rip = "streamrip.cli:main" +rip = "rip.cli:main" [tool.poetry.dependencies] python = "^3.8" @@ -28,6 +32,7 @@ pathvalidate = "^2.4.1" simple-term-menu = {version = "^1.2.1", platform = 'linux or darwin'} pick = {version = "^1.0.0", platform = 'win32 or cygwin'} windows-curses = {version = "^2.2.0", platform = 'win32 or cygwin'} +Pillow = "^8.3.0" [tool.poetry.urls] "Bug Reports" = "https://github.com/nathom/streamrip/issues" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index ef523e9..0000000 --- a/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -click -pathvalidate -requests -mutagen>=1.45.1 -tqdm -tomlkit diff --git a/rip/__init__.py b/rip/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rip/__main__.py b/rip/__main__.py new file mode 100644 index 0000000..4e28416 --- /dev/null +++ b/rip/__main__.py @@ -0,0 +1,3 @@ +from .cli import main + +main() diff --git a/streamrip/cli.py b/rip/cli.py similarity index 84% rename from streamrip/cli.py rename to rip/cli.py index 6e1f355..65c57f4 100644 --- a/streamrip/cli.py +++ b/rip/cli.py @@ -1,28 +1,11 @@ """The streamrip command line interface.""" - -import logging -import os -import shutil -from getpass import getpass -from hashlib import md5 - import click -import requests - -from . import __version__ -from .clients import TidalClient -from .config import Config -from .constants import CACHE_DIR, CONFIG_DIR, CONFIG_PATH, QOBUZ_FEATURED_KEYS -from .core import MusicDL +import logging +from streamrip import __version__ logging.basicConfig(level="WARNING") logger = logging.getLogger("streamrip") -if not os.path.isdir(CONFIG_DIR): - os.makedirs(CONFIG_DIR, exist_ok=True) -if not os.path.isdir(CACHE_DIR): - os.makedirs(CONFIG_DIR, exist_ok=True) - @click.group(invoke_without_command=True) @click.option("-c", "--convert", metavar="CODEC", help="alac, mp3, flac, or ogg") @@ -39,10 +22,10 @@ if not os.path.isdir(CACHE_DIR): metavar="INT", help="0: < 320kbps, 1: 320 kbps, 2: 16 bit/44.1 kHz, 3: 24 bit/<=96 kHz, 4: 24 bit/<=192 kHz", ) -@click.option("-t", "--text", metavar="PATH") -@click.option("-nd", "--no-db", is_flag=True) -@click.option("--debug", is_flag=True) -@click.version_option(prog_name="streamrip") +@click.option("-t", "--text", metavar="PATH", help="Download urls from a text file.") +@click.option("-nd", "--no-db", is_flag=True, help="Ignore the database.") +@click.option("--debug", is_flag=True, help="Show debugging logs.") +@click.version_option(prog_name="rip", version=__version__) @click.pass_context def cli(ctx, **kwargs): """Streamrip: The all-in-one Qobuz, Tidal, SoundCloud, and Deezer music downloader. @@ -56,6 +39,20 @@ def cli(ctx, **kwargs): $ rip config --open """ + import os + + import requests + + from .config import Config + from .constants import CONFIG_DIR + from .core import MusicDL + + logging.basicConfig(level="WARNING") + logger = logging.getLogger("streamrip") + + if not os.path.isdir(CONFIG_DIR): + os.makedirs(CONFIG_DIR, exist_ok=True) + global config global core @@ -63,7 +60,14 @@ def cli(ctx, **kwargs): logger.setLevel("DEBUG") logger.debug("Starting debug log") - if ctx.invoked_subcommand not in {None, "lastfm", "search", "disover", "config"}: + if ctx.invoked_subcommand not in { + None, + "lastfm", + "search", + "discover", + "config", + "repair", + }: return config = Config() @@ -225,6 +229,8 @@ def discover(ctx, **kwargs): * universal-chanson """ + from streamrip.constants import QOBUZ_FEATURED_KEYS + assert ( kwargs["list"] in QOBUZ_FEATURED_KEYS ), f"Invalid featured key {kwargs['list']}" @@ -284,6 +290,13 @@ def lastfm(ctx, source, url): @click.pass_context def config(ctx, **kwargs): """Manage the streamrip configuration file.""" + from streamrip.clients import TidalClient + from .constants import CONFIG_PATH + from hashlib import md5 + from getpass import getpass + import shutil + import os + global config if kwargs["reset"]: config.reset() @@ -343,9 +356,23 @@ def config(ctx, **kwargs): @click.argument("PATH") @click.pass_context def convert(ctx, **kwargs): - from . import converter + """Batch convert audio files. + + This is a tool that is included with the `rip` program that assists with + converting audio files. This is essentially a wrapper over ffmpeg + that is designed to be easy to use with sensible default options. + + Examples (assuming /my/music is filled with FLAC files): + + $ rip convert MP3 /my/music + + $ rip convert ALAC --sampling-rate 48000 /my/music + + """ + from streamrip import converter import concurrent.futures from tqdm import tqdm + import os codec_map = { "FLAC": converter.FLAC, @@ -405,6 +432,23 @@ def convert(ctx, **kwargs): click.secho(f"File {kwargs['path']} does not exist.", fg="red") +@cli.command() +@click.option( + "-n", "--num-items", help="The number of items to atttempt downloads for." +) +@click.pass_context +def repair(ctx, **kwargs): + """Retry failed downloads. + + If the failed downloads database is enabled in the config file (it is by default), + when an item is not available for download, it is logged in the database. + + When this command is called, it tries to download those items again. This is useful + for times when a temporary server error may miss a few tracks in an album. + """ + core.repair(max_items=kwargs.get("num_items")) + + def none_chosen(): """Print message if nothing was chosen.""" click.secho("No items chosen, exiting.", fg="bright_red") diff --git a/streamrip/config.py b/rip/config.py similarity index 96% rename from streamrip/config.py rename to rip/config.py index 56226e6..0964e5a 100644 --- a/streamrip/config.py +++ b/rip/config.py @@ -11,7 +11,7 @@ import click import tomlkit from .constants import CONFIG_DIR, CONFIG_PATH, DOWNLOADS_DIR -from .exceptions import InvalidSourceError +from streamrip.exceptions import InvalidSourceError logger = logging.getLogger("streamrip") @@ -86,6 +86,9 @@ class Config: os.makedirs(CONFIG_DIR, exist_ok=True) shutil.copy(self.default_config_path, self._path) + self.load() + self.file["downloads"]["folder"] = DOWNLOADS_DIR + self.save() def load(self): """Load infomation from the config files, making a deepcopy.""" diff --git a/streamrip/config.toml b/rip/config.toml similarity index 91% rename from streamrip/config.toml rename to rip/config.toml index 9da2d06..89b36ae 100644 --- a/streamrip/config.toml +++ b/rip/config.toml @@ -56,7 +56,13 @@ download_videos = false video_downloads_folder = "" # This stores a list of item IDs so that repeats are not downloaded. -[database] +[database.downloads] +enabled = true +path = "" + +# If a download fails, the item ID is stored here. Then, `rip repair` can be +# called to retry the downloads +[database.failed_downloads] enabled = true path = "" @@ -94,6 +100,10 @@ embed = true # "original" images can be up to 30MB, and may fail embedding. # Using "large" is recommended. size = "large" +# Both of these options limit the size of the embedded artwork. If their values +# are larger than the actual dimensions of the image, they will be ignored. +max_width = 999999 +max_height = 999999 # Save the cover image at the highest quality as a seperate jpg file keep_hires_cover = true @@ -128,4 +138,4 @@ fallback_source = "deezer" check_for_updates = true # Metadata to identify this config file. Do not change. -version = "0.6.1" +version = "0.7" diff --git a/rip/constants.py b/rip/constants.py new file mode 100644 index 0000000..14cc6ae --- /dev/null +++ b/rip/constants.py @@ -0,0 +1,28 @@ +import click +import re +import os +from pathlib import Path + +APPNAME = "streamrip" +APP_DIR = click.get_app_dir(APPNAME) +HOME = Path.home() + +LOG_DIR = CACHE_DIR = CONFIG_DIR = APP_DIR + +CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml") +DB_PATH = os.path.join(LOG_DIR, "downloads.db") +FAILED_DB_PATH = os.path.join(LOG_DIR, "failed_downloads.db") + +DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads") + +URL_REGEX = re.compile( + r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/" + r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)" +) +SOUNDCLOUD_URL_REGEX = re.compile(r"https://soundcloud.com/[-\w:/]+") +LASTFM_URL_REGEX = re.compile(r"https://www.last.fm/user/\w+/playlists/\w+") +QOBUZ_INTERPRETER_URL_REGEX = re.compile( + r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+" +) +DEEZER_DYNAMIC_LINK_REGEX = re.compile(r"https://deezer\.page\.link/\w+") +YOUTUBE_URL_REGEX = re.compile(r"https://www\.youtube\.com/watch\?v=[-\w]+") diff --git a/streamrip/core.py b/rip/core.py similarity index 88% rename from streamrip/core.py rename to rip/core.py index e13a69a..f6b4480 100644 --- a/streamrip/core.py +++ b/rip/core.py @@ -5,6 +5,7 @@ import html import logging import os import re + from getpass import getpass from hashlib import md5 from string import Formatter @@ -14,8 +15,17 @@ import click import requests from tqdm import tqdm -from .bases import Track, Video, YoutubeVideo -from .clients import ( +from streamrip.media import ( + Track, + Video, + YoutubeVideo, + Album, + Artist, + Label, + Playlist, + Tracklist, +) +from streamrip.clients import ( Client, DeezerClient, QobuzClient, @@ -23,31 +33,33 @@ from .clients import ( TidalClient, ) from .config import Config +from streamrip.constants import MEDIA_TYPES from .constants import ( + URL_REGEX, + SOUNDCLOUD_URL_REGEX, + LASTFM_URL_REGEX, + QOBUZ_INTERPRETER_URL_REGEX, + YOUTUBE_URL_REGEX, + DEEZER_DYNAMIC_LINK_REGEX, CONFIG_PATH, DB_PATH, - DEEZER_DYNAMIC_LINK_REGEX, - LASTFM_URL_REGEX, - MEDIA_TYPES, - QOBUZ_INTERPRETER_URL_REGEX, - SOUNDCLOUD_URL_REGEX, - URL_REGEX, - YOUTUBE_URL_REGEX, + FAILED_DB_PATH, ) -from .db import MusicDB -from .exceptions import ( +from . import db +from streamrip.exceptions import ( AuthenticationError, + PartialFailure, + ItemExists, MissingCredentials, NonStreamable, NoResultsFound, ParsingError, ) -from .tracklists import Album, Artist, Label, Playlist, Tracklist from .utils import extract_deezer_dynamic_link, extract_interpreter_url logger = logging.getLogger("streamrip") - +# ---------------- Constants ------------------ # Media = Union[ Type[Album], Type[Playlist], @@ -65,6 +77,9 @@ MEDIA_CLASS: Dict[str, Media] = { "video": Video, } +DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH} +# ---------------------------------------------- # + class MusicDL(list): """MusicDL.""" @@ -78,13 +93,6 @@ class MusicDL(list): :param config: :type config: Optional[Config] """ - self.url_parse = re.compile(URL_REGEX) - self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX) - self.lastfm_url_parse = re.compile(LASTFM_URL_REGEX) - self.interpreter_url_parse = re.compile(QOBUZ_INTERPRETER_URL_REGEX) - self.youtube_url_parse = re.compile(YOUTUBE_URL_REGEX) - self.deezer_dynamic_url_parse = re.compile(DEEZER_DYNAMIC_LINK_REGEX) - self.config: Config if config is None: self.config = Config(CONFIG_PATH) @@ -98,18 +106,28 @@ class MusicDL(list): "soundcloud": SoundCloudClient(), } - self.db: Union[MusicDB, list] - db_settings = self.config.session["database"] - if db_settings["enabled"]: - path = db_settings["path"] - if path: - self.db = MusicDB(path) - else: - self.db = MusicDB(DB_PATH) - self.config.file["database"]["path"] = DB_PATH - self.config.save() - else: - self.db = [] + def get_db(db_type: str) -> db.Database: + db_settings = self.config.session["database"] + db_class = db.CLASS_MAP[db_type] + database = db_class(None, dummy=True) + + default_db_path = DB_PATH_MAP[db_type] + if db_settings[db_type]["enabled"]: + path = db_settings[db_type]["path"] + + if path: + database = db_class(path) + else: + database = db_class(default_db_path) + + assert config is not None + config.file["database"][db_type]["path"] = default_db_path + config.save() + + return database + + self.db = get_db("downloads") + self.failed_db = get_db("failed_downloads") def handle_urls(self, urls): """Download a url. @@ -128,7 +146,7 @@ class MusicDL(list): # youtube is handled by youtube-dl, so much of the # processing is not necessary - youtube_urls = self.youtube_url_parse.findall(url) + youtube_urls = YOUTUBE_URL_REGEX.findall(url) if youtube_urls != []: self.extend(YoutubeVideo(u) for u in youtube_urls) @@ -145,7 +163,7 @@ class MusicDL(list): raise ParsingError(message) for source, url_type, item_id in parsed: - if item_id in self.db: + if {"id": item_id} in self.db: logger.info( f"ID {item_id} already downloaded, use --no-db to override." ) @@ -191,7 +209,6 @@ class MusicDL(list): session[key] for key in ("artwork", "conversion", "filepaths") ) return { - "database": self.db, "parent_folder": session["downloads"]["folder"], "folder_format": filepaths["folder_format"], "track_format": filepaths["track_format"], @@ -210,8 +227,30 @@ class MusicDL(list): "video_downloads_folder" ], "add_singles_to_folder": filepaths["add_singles_to_folder"], + "max_artwork_width": int(artwork["max_width"]), + "max_artwork_height": int(artwork["max_height"]), } + def repair(self, max_items=None): + if max_items is None: + max_items = float("inf") + + if self.failed_db.is_dummy: + click.secho( + "Failed downloads database must be enabled in the config file " + "to repair!", + fg="red", + ) + raise click.Abort + + for counter, (source, media_type, item_id) in enumerate(self.failed_db): + if counter >= max_items: + break + + self.handle_item(source, media_type, item_id) + + self.download() + def download(self): """Download all the items in self.""" try: @@ -251,18 +290,32 @@ class MusicDL(list): try: item.load_meta(**arguments) except NonStreamable: + self.failed_db.add((item.client.source, item.type, item.id)) click.secho(f"{item!s} is not available, skipping.", fg="red") continue - item.download(**arguments) + try: + item.download(**arguments) + except NonStreamable as e: + e.print(item) + self.failed_db.add((item.client.source, item.type, item.id)) + continue + except PartialFailure as e: + for failed_item in e.failed_items: + self.failed_db.add(failed_item) + continue + except ItemExists as e: + click.secho(f'"{e!s}" already exists. Skipping.', fg="yellow") + continue + + if hasattr(item, "id"): + self.db.add([item.id]) + if isinstance(item, Track): item.tag() if arguments["conversion"]["enabled"]: item.convert(**arguments["conversion"]) - if self.db != [] and hasattr(item, "id"): - self.db.add(item.id) - def get_client(self, source: str) -> Client: """Get a client given the source and log in. @@ -325,7 +378,7 @@ class MusicDL(list): """ parsed: List[Tuple[str, str, str]] = [] - interpreter_urls = self.interpreter_url_parse.findall(url) + interpreter_urls = QOBUZ_INTERPRETER_URL_REGEX.findall(url) if interpreter_urls: click.secho( "Extracting IDs from Qobuz interpreter urls. Use urls " @@ -336,9 +389,9 @@ class MusicDL(list): ("qobuz", "artist", extract_interpreter_url(u)) for u in interpreter_urls ) - url = self.interpreter_url_parse.sub("", url) + url = QOBUZ_INTERPRETER_URL_REGEX.sub("", url) - dynamic_urls = self.deezer_dynamic_url_parse.findall(url) + dynamic_urls = DEEZER_DYNAMIC_LINK_REGEX.findall(url) if dynamic_urls: click.secho( "Extracting IDs from Deezer dynamic link. Use urls " @@ -350,8 +403,8 @@ class MusicDL(list): ("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls ) - parsed.extend(self.url_parse.findall(url)) # Qobuz, Tidal, Dezer - soundcloud_urls = self.soundcloud_url_parse.findall(url) + parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer + soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url) soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls] parsed.extend( @@ -384,7 +437,7 @@ class MusicDL(list): # For testing: # https://www.last.fm/user/nathan3895/playlists/12058911 user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+") - lastfm_urls = self.lastfm_url_parse.findall(urls) + lastfm_urls = LASTFM_URL_REGEX.findall(urls) try: lastfm_source = self.config.session["lastfm"]["source"] lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"] @@ -554,7 +607,7 @@ class MusicDL(list): ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields}) return ret - def interactive_search( # noqa + def interactive_search( self, query: str, source: str = "qobuz", media_type: str = "album" ): """Show an interactive menu that contains search results. diff --git a/rip/db.py b/rip/db.py new file mode 100644 index 0000000..4e9ce15 --- /dev/null +++ b/rip/db.py @@ -0,0 +1,145 @@ +"""Wrapper over a database that stores item IDs.""" + +import logging +import os +import sqlite3 +from typing import List + +logger = logging.getLogger("streamrip") + + +class Database: + """A wrapper for an sqlite database.""" + + structure: dict + name: str + + def __init__(self, path, dummy=False): + assert self.structure != [] + assert self.name + + if dummy or path is None: + self.path = None + self.is_dummy = True + return + self.is_dummy = False + + self.path = path + if not os.path.exists(self.path): + self.create() + + def create(self): + """Create a database.""" + if self.is_dummy: + return + + with sqlite3.connect(self.path) as conn: + params = ", ".join( + f"{key} {' '.join(map(str.upper, props))} NOT NULL" + for key, props in self.structure.items() + ) + command = f"CREATE TABLE {self.name} ({params})" + + logger.debug(f"executing {command}") + + conn.execute(command) + + def keys(self): + """Get the column names of the table.""" + return self.structure.keys() + + def contains(self, **items) -> bool: + """Check whether items matches an entry in the table. + + :param items: a dict of column-name + expected value + :rtype: bool + """ + if self.is_dummy: + return False + + allowed_keys = set(self.structure.keys()) + assert all( + key in allowed_keys for key in items.keys() + ), f"Invalid key. Valid keys: {allowed_keys}" + + items = {k: str(v) for k, v in items.items()} + + with sqlite3.connect(self.path) as conn: + conditions = " AND ".join(f"{key}=?" for key in items.keys()) + command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})" + + logger.debug(f"executing {command}") + + return bool(conn.execute(command, tuple(items.values())).fetchone()[0]) + + def __contains__(self, keys: dict) -> bool: + return self.contains(**keys) + + def add(self, items: List[str]): + """Add a row to the table. + + :param items: Column-name + value. Values must be provided for all cols. + :type items: List[str] + """ + if self.is_dummy: + return + + assert len(items) == len(self.structure) + + params = ", ".join(self.structure.keys()) + question_marks = ", ".join("?" for _ in items) + command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})" + + logger.debug(f"executing {command}") + + with sqlite3.connect(self.path) as conn: + try: + conn.execute(command, tuple(items)) + except sqlite3.IntegrityError as e: + # tried to insert an item that was already there + logger.debug(e) + + def remove(self, **items): + # not in use currently + if self.is_dummy: + return + + conditions = " AND ".join(f"{key}=?" for key in items.keys()) + command = f"DELETE FROM {self.name} WHERE {conditions}" + + with sqlite3.connect(self.path) as conn: + logger.debug(command) + print(command) + conn.execute(command, tuple(items.values())) + + def __iter__(self): + if self.is_dummy: + return () + + with sqlite3.connect(self.path) as conn: + return conn.execute(f"SELECT * FROM {self.name}") + + def reset(self): + try: + os.remove(self.path) + except FileNotFoundError: + pass + + +class Downloads(Database): + name = "downloads" + structure = { + "id": ["text", "unique"], + } + + +class FailedDownloads(Database): + name = "failed_downloads" + structure = { + "source": ["text"], + "media_type": ["text"], + "id": ["text", "unique"], + } + + +CLASS_MAP = {db.name: db for db in (Downloads, FailedDownloads)} diff --git a/rip/utils.py b/rip/utils.py new file mode 100644 index 0000000..ac91776 --- /dev/null +++ b/rip/utils.py @@ -0,0 +1,46 @@ +from streamrip.utils import gen_threadsafe_session +from streamrip.constants import AGENT +from typing import Tuple +import re + +interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'") + + +def extract_interpreter_url(url: str) -> str: + """Extract artist ID from a Qobuz interpreter url. + + :param url: Urls of the form "https://www.qobuz.com/us-en/interpreter/{artist}/download-streaming-albums" + :type url: str + :rtype: str + """ + session = gen_threadsafe_session({"User-Agent": AGENT}) + r = session.get(url) + match = interpreter_artist_regex.search(r.text) + if match: + return match.group(1) + + raise Exception( + "Unable to extract artist id from interpreter url. Use a " + "url that contains an artist id." + ) + + +deezer_id_link_regex = re.compile( + r"https://www\.deezer\.com/[a-z]{2}/(album|artist|playlist|track)/(\d+)" +) + + +def extract_deezer_dynamic_link(url: str) -> Tuple[str, str]: + """Extract a deezer url that includes an ID from a deezer.page.link url. + + :param url: + :type url: str + :rtype: Tuple[str, str] + """ + session = gen_threadsafe_session({"User-Agent": AGENT}) + r = session.get(url) + match = deezer_id_link_regex.search(r.text) + if match: + return match.group(1), match.group(2) + + raise Exception("Unable to extract Deezer dynamic link.") diff --git a/streamrip/__init__.py b/streamrip/__init__.py index 8551bca..7a69b4c 100644 --- a/streamrip/__init__.py +++ b/streamrip/__init__.py @@ -1,3 +1,5 @@ """streamrip: the all in one music downloader.""" -__version__ = "0.6.7" +__version__ = "0.7" + +from . import clients, converter, media, constants diff --git a/streamrip/clients.py b/streamrip/clients.py index aa6aec3..ccfd043 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -268,12 +268,14 @@ class QobuzClient(Client): extras = { "artist": "albums", "playlist": "tracks", - "label": "albums", # not tested + "label": "albums", } if media_type in extras: params.update({"extra": extras[media_type]}) + logger.debug("request params: %s", params) + epoint = f"{media_type}/get" response, status_code = self._api_request(epoint, params) @@ -660,15 +662,14 @@ class TidalClient(Client): # pending time.sleep(4) continue - elif status == 1: - # error checking - raise Exception elif status == 0: # successful break else: raise Exception + self._update_authorization() + def _get_device_code(self): """Get the device code that will be used to log in on the browser.""" data = { diff --git a/streamrip/constants.py b/streamrip/constants.py index c97818e..7f6fdd4 100644 --- a/streamrip/constants.py +++ b/streamrip/constants.py @@ -1,25 +1,12 @@ """Constants that are kept in one place.""" -import os -from pathlib import Path - -import click import mutagen.id3 as id3 - -APPNAME = "streamrip" - -CACHE_DIR = click.get_app_dir(APPNAME) -CONFIG_DIR = click.get_app_dir(APPNAME) -CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml") -LOG_DIR = click.get_app_dir(APPNAME) -DB_PATH = os.path.join(LOG_DIR, "downloads.db") - -HOME = Path.home() -DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads") +import re AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0" TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" +SOUNDCLOUD_CLIENT_ID = re.compile("a3e059563d7fd3372b49b37f00a00bcf") QUALITY_DESC = { @@ -147,20 +134,6 @@ FOLDER_FORMAT = ( TRACK_FORMAT = "{tracknumber}. {artist} - {title}" -# ------------------ Regexes ------------------- # -URL_REGEX = ( - r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/" - r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)" -) -SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:/]+" -SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf" -LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+" -QOBUZ_INTERPRETER_URL_REGEX = ( - r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+" -) -DEEZER_DYNAMIC_LINK_REGEX = r"https://deezer\.page\.link/\w+" -YOUTUBE_URL_REGEX = r"https://www\.youtube\.com/watch\?v=[-\w]+" - TIDAL_MAX_Q = 7 TIDAL_Q_MAP = { diff --git a/streamrip/converter.py b/streamrip/converter.py index 9024363..31752b9 100644 --- a/streamrip/converter.py +++ b/streamrip/converter.py @@ -11,7 +11,7 @@ from .exceptions import ConversionError logger = logging.getLogger("streamrip") -SAMPLING_RATES = (44100, 48000, 88200, 96000, 176400, 192000) +SAMPLING_RATES = {44100, 48000, 88200, 96000, 176400, 192000} class Converter: diff --git a/streamrip/db.py b/streamrip/db.py deleted file mode 100644 index 3102eb2..0000000 --- a/streamrip/db.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Wrapper over a database that stores item IDs.""" - -import logging -import os -import sqlite3 -from typing import Union - -logger = logging.getLogger("streamrip") - - -class MusicDB: - """Simple interface for the downloaded track database.""" - - def __init__(self, db_path: Union[str, os.PathLike]): - """Create a MusicDB object. - - :param db_path: filepath of the database - :type db_path: Union[str, os.PathLike] - """ - self.path = db_path - if not os.path.exists(self.path): - self.create() - - def create(self): - """Create a database at `self.path`.""" - with sqlite3.connect(self.path) as conn: - try: - conn.execute("CREATE TABLE downloads (id TEXT UNIQUE NOT NULL);") - logger.debug("Download-IDs database created: %s", self.path) - except sqlite3.OperationalError: - pass - - return self.path - - def __contains__(self, item_id: Union[str, int]) -> bool: - """Check whether the database contains an id. - - :param item_id: the id to check - :type item_id: str - :rtype: bool - """ - logger.debug("Checking database for ID %s", item_id) - with sqlite3.connect(self.path) as conn: - return ( - conn.execute( - "SELECT id FROM downloads where id=?", (item_id,) - ).fetchone() - is not None - ) - - def add(self, item_id: str): - """Add an id to the database. - - :param item_id: - :type item_id: str - """ - logger.debug("Adding ID %s", item_id) - with sqlite3.connect(self.path) as conn: - try: - conn.execute( - "INSERT INTO downloads (id) VALUES (?)", - (item_id,), - ) - conn.commit() - except sqlite3.Error as err: - if "UNIQUE" not in str(err): - raise diff --git a/streamrip/exceptions.py b/streamrip/exceptions.py index a6b0520..ad8e1d6 100644 --- a/streamrip/exceptions.py +++ b/streamrip/exceptions.py @@ -1,3 +1,7 @@ +from typing import List +import click + + class AuthenticationError(Exception): pass @@ -23,7 +27,21 @@ class InvalidQuality(Exception): class NonStreamable(Exception): - pass + def __init__(self, message=None): + self.message = message + super().__init__(self.message) + + def print(self, item): + print(self.print_msg(item)) + + def print_msg(self, item) -> str: + base_msg = click.style(f"Unable to stream {item!s}.", fg="yellow") + if self.message: + base_msg += click.style(" Message: ", fg="yellow") + click.style( + self.message, fg="red" + ) + + return base_msg class InvalidContainerError(Exception): @@ -52,3 +70,13 @@ class ConversionError(Exception): class NoResultsFound(Exception): pass + + +class ItemExists(Exception): + pass + + +class PartialFailure(Exception): + def __init__(self, failed_items: List): + self.failed_items = failed_items + super().__init__() diff --git a/streamrip/bases.py b/streamrip/media.py similarity index 51% rename from streamrip/bases.py rename to streamrip/media.py index 845559d..90d9404 100644 --- a/streamrip/bases.py +++ b/streamrip/media.py @@ -8,24 +8,27 @@ as a single track. import concurrent.futures import logging import os +import abc import re import shutil import subprocess from tempfile import gettempdir -from typing import Any, Optional, Union +from typing import Any, Optional, Union, Iterable, Generator, Dict, Tuple, List import click import tqdm from mutagen.flac import FLAC, Picture from mutagen.id3 import APIC, ID3, ID3NoHeaderError from mutagen.mp4 import MP4, MP4Cover -from pathvalidate import sanitize_filepath +from pathvalidate import sanitize_filepath, sanitize_filename from . import converter from .clients import Client -from .constants import FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT +from .constants import FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT, ALBUM_KEYS from .exceptions import ( InvalidQuality, + PartialFailure, + ItemExists, InvalidSourceError, NonStreamable, TooLargeCoverArt, @@ -33,9 +36,12 @@ from .exceptions import ( from .metadata import TrackMetadata from .utils import ( clean_format, - decho, + downsize_image, + get_cover_urls, decrypt_mqa_file, + get_container, ext, + get_stats_from_quality, safe_get, tidal_cover_url, tqdm_download, @@ -49,7 +55,38 @@ TYPE_REGEXES = { } -class Track: +class Media(abc.ABC): + @abc.abstractmethod + def download(self, **kwargs): + pass + + @abc.abstractmethod + def load_meta(self, **kwargs): + pass + + @abc.abstractmethod + def tag(self, **kwargs): + pass + + @property + @abc.abstractmethod + def type(self): + pass + + @abc.abstractmethod + def convert(self, **kwargs): + pass + + @abc.abstractmethod + def __repr__(self): + pass + + @abc.abstractmethod + def __str__(self): + pass + + +class Track(Media): """Represents a downloadable track. Loading metadata as a single track: @@ -104,7 +141,7 @@ class Track: logger.debug("Cover url: %s", u) self.cover_url = u - def load_meta(self): + def load_meta(self, **kwargs): """Send a request to the client to get metadata for this Track. Usually only called for single tracks and last.fm playlists. @@ -163,26 +200,22 @@ class Track: os.makedirs(self.folder, exist_ok=True) - if self.id in kwargs.get("database", []): - self.downloaded = True - self.tagged = True - self.path = self.final_path - decho( - f"{self['title']} already logged in database, skipping.", - fg="magenta", - ) - return False # because the track was not downloaded - if os.path.isfile(self.final_path): # track already exists self.downloaded = True self.tagged = True self.path = self.final_path - decho(f"Track already exists: {self.final_path}", fg="magenta") - return False + raise ItemExists(self.final_path) + + if hasattr(self, "cover_url"): + try: + self.download_cover( + width=kwargs.get("max_artwork_width", 999999), + height=kwargs.get("max_artwork_height", 999999), + ) # only downloads for playlists and singles + except ItemExists as e: + logger.debug(e) - self.download_cover() # only downloads for playlists and singles self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp") - return True def download( self, @@ -190,7 +223,7 @@ class Track: parent_folder: str = "StreamripDownloads", progress_bar: bool = True, **kwargs, - ) -> bool: + ): """Download the track. :param quality: (0, 1, 2, 3, 4) @@ -200,13 +233,12 @@ class Track: :param progress_bar: turn on/off progress bar :type progress_bar: bool """ - if not self._prepare_download( + self._prepare_download( quality=quality, parent_folder=parent_folder, progress_bar=progress_bar, **kwargs, - ): - return False + ) if self.client.source == "soundcloud": # soundcloud client needs whole dict to get file url @@ -217,14 +249,14 @@ class Track: try: dl_info = self.client.get_file_url(url_id, self.quality) except Exception as e: - click.secho(f"Unable to download track. {e}", fg="red") - return False + # click.secho(f"Unable to download track. {e}", fg="red") + raise NonStreamable(e) if self.client.source == "qobuz": assert isinstance(dl_info, dict) # for typing if not self.__validate_qobuz_dl_info(dl_info): - click.secho("Track is not available for download", fg="red") - return False + # click.secho("Track is not available for download", fg="red") + raise NonStreamable("Track is not available for download") self.sampling_rate = dl_info.get("sampling_rate") self.bit_depth = dl_info.get("bit_depth") @@ -233,19 +265,12 @@ class Track: if self.client.source in ("qobuz", "tidal", "deezer"): assert isinstance(dl_info, dict) logger.debug("Downloadable URL found: %s", dl_info.get("url")) - try: - tqdm_download( - dl_info["url"], self.path, desc=self._progress_desc - ) # downloads file - except NonStreamable: - click.secho( - f"Track {self!s} is not available for download, skipping.", - fg="red", - ) - return False + tqdm_download( + dl_info["url"], self.path, desc=self._progress_desc + ) # downloads file elif self.client.source == "soundcloud": - assert isinstance(dl_info, dict) + assert isinstance(dl_info, dict) # for typing self._soundcloud_download(dl_info) else: @@ -257,17 +282,13 @@ class Track: and dl_info.get("enc_key", False) ): out_path = f"{self.path}_dec" + logger.debug("Decrypting MQA file") decrypt_mqa_file(self.path, out_path, dl_info["enc_key"]) self.path = out_path if not kwargs.get("stay_temp", False): self.move(self.final_path) - database = kwargs.get("database") - if database: - database.add(self.id) - logger.debug(f"{self.id} added to database") - logger.debug("Downloaded: %s -> %s", self.path, self.final_path) self.downloaded = True @@ -275,8 +296,6 @@ class Track: if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"): os.remove(self.cover_path) - return True - def __validate_qobuz_dl_info(self, info: dict) -> bool: """Check if the download info dict returned by Qobuz is downloadable. @@ -343,6 +362,10 @@ class Track: self.final_path = self.final_path.replace(".mp3", ".flac") self.quality = 2 + @property + def type(self) -> str: + return "track" + @property def _progress_desc(self) -> str: """Get the description that is used on the progress bar. @@ -351,11 +374,8 @@ class Track: """ return click.style(f"Track {int(self.meta.tracknumber):02}", fg="blue") - def download_cover(self): + def download_cover(self, width=999999, height=999999): """Download the cover art, if cover_url is given.""" - if not hasattr(self, "cover_url"): - return False - self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg") logger.debug(f"Downloading cover from {self.cover_url}") # click.secho(f"\nDownloading cover art for {self!s}", fg="blue") @@ -366,8 +386,10 @@ class Track: self.cover_path, desc=click.style("Cover", fg="cyan"), ) + downsize_image(self.cover_path, width, height) else: logger.debug("Cover already exists, skipping download") + raise ItemExists(self.cover_path) def format_final_path(self) -> str: """Return the final filepath of the downloaded file. @@ -437,11 +459,12 @@ class Track: cover_url=cover_url, ) - def tag( # noqa + def tag( self, album_meta: dict = None, cover: Union[Picture, APIC, MP4Cover] = None, embed_cover: bool = True, + **kwargs, ): """Tag the track using the stored metadata. @@ -666,7 +689,7 @@ class Track: return True -class Video: +class Video(Media): """Only for Tidal.""" def __init__(self, client: Client, id: str, **kwargs): @@ -684,7 +707,7 @@ class Video: self.explicit = kwargs.get("explicit", False) self.tracknumber = kwargs.get("tracknumber", None) - def load_meta(self): + def load_meta(self, **kwargs): """Given an id at contruction, get the metadata of the video.""" resp = self.client.get(self.id, "video") self.title = resp["title"] @@ -716,8 +739,6 @@ class Video: p = subprocess.Popen(command) p.wait() # remove this? - return False # so that it is not tagged - def tag(self, *args, **kwargs): """Return False. @@ -745,6 +766,9 @@ class Video: tracknumber=track["trackNumber"], ) + def convert(self, *args, **kwargs): + pass + @property def path(self) -> str: """Get path to download the mp4 file. @@ -760,6 +784,10 @@ class Video: return os.path.join(self.parent_folder, f"{fname}.mp4") + @property + def type(self) -> str: + return "video" + def __str__(self) -> str: """Return the title. @@ -778,6 +806,101 @@ class Video: return True +class YoutubeVideo(Media): + """Dummy class implemented for consistency with the Media API.""" + + class DummyClient: + """Used because YouTube downloads use youtube-dl, not a client.""" + + source = "youtube" + + def __init__(self, url: str): + """Create a YoutubeVideo object. + + :param url: URL to the youtube video. + :type url: str + """ + self.url = url + self.client = self.DummyClient() + + def download( + self, + parent_folder: str = "StreamripDownloads", + download_youtube_videos: bool = False, + youtube_video_downloads_folder: str = "StreamripDownloads", + **kwargs, + ): + """Download the video using 'youtube-dl'. + + :param parent_folder: + :type parent_folder: str + :param download_youtube_videos: True if the video should be downloaded. + :type download_youtube_videos: bool + :param youtube_video_downloads_folder: Folder to put videos if + downloaded. + :type youtube_video_downloads_folder: str + :param kwargs: + """ + click.secho(f"Downloading url {self.url}", fg="blue") + filename_formatter = "%(track_number)s.%(track)s.%(container)s" + filename = os.path.join(parent_folder, filename_formatter) + + p = subprocess.Popen( + [ + "youtube-dl", + "-x", # audio only + "-q", # quiet mode + "--add-metadata", + "--audio-format", + "mp3", + "--embed-thumbnail", + "-o", + filename, + self.url, + ] + ) + + if download_youtube_videos: + click.secho("Downloading video stream", fg="blue") + pv = subprocess.Popen( + [ + "youtube-dl", + "-q", + "-o", + os.path.join( + youtube_video_downloads_folder, + "%(title)s.%(container)s", + ), + self.url, + ] + ) + pv.wait() + p.wait() + + def load_meta(self, *args, **kwargs): + """Return None. + + Dummy method. + + :param args: + :param kwargs: + """ + pass + + def tag(self, *args, **kwargs): + """Return None. + + Dummy method. + + :param args: + :param kwargs: + """ + pass + + def __bool__(self): + return True + + class Booklet: """Only for Qobuz.""" @@ -807,6 +930,9 @@ class Booklet: filepath = os.path.join(parent_folder, f"{self.description}.pdf") tqdm_download(self.url, filepath) + def type(self) -> str: + return "booklet" + def __bool__(self): return True @@ -840,15 +966,29 @@ class Tracklist(list): else: target = self._download_item + # TODO: make this function return the items that have not been downloaded + failed_downloads: List[Tuple[str, str, str]] = [] if kwargs.get("concurrent_downloads", True): - # Tidal errors out with unlimited concurrency with concurrent.futures.ThreadPoolExecutor(15) as executor: - futures = [executor.submit(target, item, **kwargs) for item in self] + future_map = { + executor.submit(target, item, **kwargs): item for item in self + } + # futures = [executor.submit(target, item, **kwargs) for item in self] try: - concurrent.futures.wait(futures) + concurrent.futures.wait(future_map.keys()) + for future in future_map.keys(): + try: + future.result() + except NonStreamable as e: + item = future_map[future] + e.print(item) + failed_downloads.append( + (item.client.source, item.type, item.id) + ) + except (KeyboardInterrupt, SystemExit): executor.shutdown() - tqdm.write("Aborted! May take some time to shutdown.") + click.echo("Aborted! May take some time to shutdown.") raise click.Abort else: @@ -857,20 +997,29 @@ class Tracklist(list): # soundcloud only gets metadata after `target` is called # message will be printed in `target` click.secho(f'\nDownloading "{item!s}"', fg="blue") - target(item, **kwargs) + try: + target(item, **kwargs) + except ItemExists: + click.secho(f"{item!s} exists. Skipping.", fg="yellow") + except NonStreamable as e: + e.print(item) + failed_downloads.append((item.client.source, item.type, item.id)) self.downloaded = True - def _download_and_convert_item(self, item, **kwargs): + if failed_downloads: + raise PartialFailure(failed_downloads) + + def _download_and_convert_item(self, item: Media, **kwargs): """Download and convert an item. :param item: :param kwargs: should contain a `conversion` dict. """ - if self._download_item(item, **kwargs): - item.convert(**kwargs["conversion"]) + self._download_item(item, **kwargs) + item.convert(**kwargs["conversion"]) - def _download_item(item, *args: Any, **kwargs: Any) -> bool: + def _download_item(self, item: Media, **kwargs: Any): """Abstract method. :param item: @@ -1024,6 +1173,10 @@ class Tracklist(list): return album + @property + def type(self) -> str: + return self.__class__.__name__.lower() + def __getitem__(self, key): """Get an item if key is int, otherwise get an attr. @@ -1051,96 +1204,849 @@ class Tracklist(list): return True -class YoutubeVideo: - """Dummy class implemented for consistency with the Media API.""" +class Album(Tracklist): + """Represents a downloadable album. - class DummyClient: - """Used because YouTube downloads use youtube-dl, not a client.""" + Usage: - source = "youtube" + >>> resp = client.get('fleetwood mac rumours', 'album') + >>> album = Album.from_api(resp['items'][0], client) + >>> album.load_meta() + >>> album.download() + """ - def __init__(self, url: str): - """Create a YoutubeVideo object. + def __init__(self, client: Client, **kwargs): + """Create a new Album object. - :param url: URL to the youtube video. - :type url: str + :param client: a qopy client instance + :param album_id: album id returned by qobuz api + :type album_id: Union[str, int] + :param kwargs: """ - self.url = url - self.client = self.DummyClient() + self.client = client - def download( - self, - parent_folder: str = "StreamripDownloads", - download_youtube_videos: bool = False, - youtube_video_downloads_folder: str = "StreamripDownloads", - **kwargs, - ): - """Download the video using 'youtube-dl'. + self.sampling_rate = None + self.bit_depth = None + self.container: Optional[str] = None + + self.disctotal: int + self.tracktotal: int + self.albumartist: str + + # usually an unpacked TrackMetadata.asdict() + self.__dict__.update(kwargs) + + # to improve from_api method speed + if kwargs.get("load_on_init", False): + self.load_meta() + + self.loaded = False + self.downloaded = False + + def load_meta(self, **kwargs): + """Load detailed metadata from API using the id.""" + assert hasattr(self, "id"), "id must be set to load metadata" + resp = self.client.get(self.id, media_type="album") + + # update attributes based on response + self.meta = self._parse_get_resp(resp, self.client) + self.__dict__.update(self.meta.asdict()) # used for identification + + if not self.get("streamable", False): + raise NonStreamable(f"This album is not streamable ({self.id} ID)") + + self._load_tracks(resp) + self.loaded = True + + @classmethod + def from_api(cls, resp: dict, client: Client): + """Create an Album object from an API response. + + :param resp: + :type resp: dict + :param client: + :type client: Client + """ + if client.source == "soundcloud": + return Playlist.from_api(resp, client) + + info = cls._parse_get_resp(resp, client) + return cls(client, **info.asdict()) + + def _prepare_download(self, **kwargs): + """Prepare the download of the album. + + :param kwargs: + """ + # Generate the folder name + self.folder_format = kwargs.get("folder_format", FOLDER_FORMAT) + self.quality = min(kwargs.get("quality", 3), self.client.max_quality) + + self.folder = self._get_formatted_folder( + kwargs.get("parent_folder", "StreamripDownloads"), self.quality + ) + os.makedirs(self.folder, exist_ok=True) + + self.download_message() + + # choose optimal cover size and download it + click.secho("Downloading cover art", fg="magenta") + cover_path = os.path.join(gettempdir(), f"cover_{hash(self)}.jpg") + embed_cover_size = kwargs.get("embed_cover_size", "large") + + assert ( + embed_cover_size in self.cover_urls + ), f"Invalid cover size. Must be in {self.cover_urls.keys()}" + + embed_cover_url = self.cover_urls[embed_cover_size] + if embed_cover_url is not None: + tqdm_download(embed_cover_url, cover_path) + else: # sometimes happens with Deezer + cover_url = [u for u in self.cover_urls.values() if u][0] + tqdm_download(cover_url, cover_path) + + hires_cov_path = os.path.join(self.folder, "cover.jpg") + if kwargs.get("keep_hires_cover", True) and not os.path.exists(hires_cov_path): + tqdm_download(self.cover_urls["original"], hires_cov_path) + + cover_size = os.path.getsize(cover_path) + if cover_size > FLAC_MAX_BLOCKSIZE: # 16.77 MB + click.secho( + "Downgrading embedded cover size, too large ({cover_size}).", + fg="bright_yellow", + ) + # large is about 600x600px which is guaranteed < 16.7 MB + tqdm_download(self.cover_urls["large"], cover_path) + + downsize_image( + cover_path, + kwargs.get("max_artwork_width", 999999), + kwargs.get("max_artwork_height", 999999), + ) + + embed_cover = kwargs.get("embed_cover", True) # embed by default + if self.client.source != "deezer" and embed_cover: + # container generated when formatting folder name + self.cover_obj = self.get_cover_obj( + cover_path, self.container, self.client.source + ) + else: + self.cover_obj = None + + # Download the booklet if applicable + if ( + self.get("booklets") + and kwargs.get("download_booklets", True) + and not any(f.endswith(".pdf") for f in os.listdir(self.folder)) + ): + click.secho("\nDownloading booklets", fg="blue") + for item in self.booklets: + Booklet(item).download(parent_folder=self.folder) + + def _download_item(self, item: Media, **kwargs: Any): + """Download an item. + + :param track: The item. + :type track: Union[Track, Video] + :param quality: + :type quality: int + :param kwargs: + :rtype: bool + """ + logger.debug("Downloading track to %s", self.folder) + if self.disctotal > 1 and isinstance(item, Track): + disc_folder = os.path.join(self.folder, f"Disc {item.meta.discnumber}") + kwargs["parent_folder"] = disc_folder + else: + kwargs["parent_folder"] = self.folder + + quality = kwargs.get("quality", 3) + kwargs.pop("quality") + item.download(quality=min(self.quality, quality), **kwargs) + + logger.debug("tagging tracks") + # deezer tracks come tagged + if kwargs.get("tag_tracks", True) and self.client.source != "deezer": + item.tag( + cover=self.cover_obj, + embed_cover=kwargs.get("embed_cover", True), + ) + + @staticmethod + def _parse_get_resp(resp: dict, client: Client) -> TrackMetadata: + """Parse information from a client.get(query, 'album') call. + + :param resp: + :type resp: dict + :rtype: dict + """ + meta = TrackMetadata(album=resp, source=client.source) + meta.id = resp["id"] + return meta + + def _load_tracks(self, resp): + """Load the tracks into self from an API response. + + This uses a classmethod to convert an item into a Track object, which + stores the metadata inside a TrackMetadata object. + """ + logging.debug("Loading %d tracks to album", self.tracktotal) + for track in _get_tracklist(resp, self.client.source): + if track.get("type") == "Music Video": + self.append(Video.from_album_meta(track, self.client)) + else: + self.append( + Track.from_album_meta( + album=self.meta, track=track, client=self.client + ) + ) + + def _get_formatter(self) -> dict: + """Get a formatter that is used for naming folders and previews. + + :rtype: dict + """ + fmt = {key: self.get(key) for key in ALBUM_KEYS} + + stats = tuple( + min(bd, sr) + for bd, sr in zip( + (self.meta.bit_depth, self.meta.sampling_rate), + get_stats_from_quality(self.quality), + ) + ) + + # The quality chosen is not the maximum available quality + if stats != (fmt.get("sampling_rate"), fmt.get("bit_depth")): + fmt["bit_depth"] = stats[0] + fmt["sampling_rate"] = stats[1] + + if sr := fmt.get("sampling_rate"): + if sr % 1000 == 0: + # truncate the decimal .0 when converting to str + fmt["sampling_rate"] = int(sr / 1000) + else: + fmt["sampling_rate"] = sr / 1000 + + return fmt + + def _get_formatted_folder(self, parent_folder: str, quality: int) -> str: + """Generate the folder name for this album. :param parent_folder: :type parent_folder: str - :param download_youtube_videos: True if the video should be downloaded. - :type download_youtube_videos: bool - :param youtube_video_downloads_folder: Folder to put videos if - downloaded. - :type youtube_video_downloads_folder: str + :param quality: + :type quality: int + :rtype: str + """ + # necessary to format the folder + self.container = get_container(quality, self.client.source) + if self.container in ("AAC", "MP3"): + # lossy codecs don't have these metrics + self.bit_depth = self.sampling_rate = None + + formatted_folder = clean_format(self.folder_format, self._get_formatter()) + + return os.path.join(parent_folder, formatted_folder) + + @property + def title(self) -> str: + """Get the title of the album. + + :rtype: str + """ + return self.album + + @title.setter + def title(self, val: str): + """Set the title of the Album. + + :param val: + :type val: str + """ + self.album = val + + def __repr__(self) -> str: + """Return a string representation of this Album object. + + :rtype: str + """ + # Avoid AttributeError if load_on_init key is not set + if hasattr(self, "albumartist"): + return f"" + + return f"" + + def __str__(self) -> str: + """Return a readable string representation of this album. + + :rtype: str + """ + return f"{self['albumartist']} - {self['title']}" + + def __len__(self) -> int: + """Get the length of the album. + + :rtype: int + """ + return self.tracktotal + + def __hash__(self): + """Hash the album.""" + return hash(self.id) + + +class Playlist(Tracklist): + """Represents a downloadable playlist. + + Usage: + >>> resp = client.search('hip hop', 'playlist') + >>> pl = Playlist.from_api(resp['items'][0], client) + >>> pl.load_meta() + >>> pl.download() + """ + + def __init__(self, client: Client, **kwargs): + """Create a new Playlist object. + + :param client: a qopy client instance + :param album_id: playlist id returned by qobuz api + :type album_id: Union[str, int] :param kwargs: """ - click.secho(f"Downloading url {self.url}", fg="blue") - filename_formatter = "%(track_number)s.%(track)s.%(container)s" - filename = os.path.join(parent_folder, filename_formatter) + self.name: str + self.client = client - p = subprocess.Popen( - [ - "youtube-dl", - "-x", # audio only - "-q", # quiet mode - "--add-metadata", - "--audio-format", - "mp3", - "--embed-thumbnail", - "-o", - filename, - self.url, - ] + for k, v in kwargs.items(): + setattr(self, k, v) + + # to improve from_api method speed + if kwargs.get("load_on_init"): + self.load_meta() + + self.loaded = False + + @classmethod + def from_api(cls, resp: dict, client: Client): + """Return a Playlist object from an API response. + + :param resp: a single search result entry of a playlist + :type resp: dict + :param client: + :type client: Client + """ + info = cls._parse_get_resp(resp, client) + return cls(client, **info) + + def load_meta(self, **kwargs): + """Send a request to fetch the tracklist from the api. + + :param new_tracknumbers: replace the tracknumber with playlist position + :type new_tracknumbers: bool + :param kwargs: + """ + self.meta = self.client.get(self.id, media_type="playlist") + logger.debug(self.meta) + self._load_tracks(**kwargs) + self.loaded = True + + def _load_tracks(self, new_tracknumbers: bool = True, **kwargs): + """Parse the tracklist returned by the API. + + :param new_tracknumbers: replace tracknumber tag with playlist position + :type new_tracknumbers: bool + """ + if self.client.source == "qobuz": + self.name = self.meta["name"] + self.image = self.meta["images"] + self.creator = safe_get(self.meta, "owner", "name", default="Qobuz") + + tracklist = self.meta["tracks"]["items"] + + def meta_args(track): + return {"track": track, "album": track["album"]} + + elif self.client.source == "tidal": + self.name = self.meta["title"] + self.image = tidal_cover_url(self.meta["image"], 640) + self.creator = safe_get(self.meta, "creator", "name", default="TIDAL") + + tracklist = self.meta["tracks"] + + def meta_args(track): + return { + "track": track, + "source": self.client.source, + } + + elif self.client.source == "deezer": + self.name = self.meta["title"] + self.image = self.meta["picture_big"] + self.creator = safe_get(self.meta, "creator", "name", default="Deezer") + + tracklist = self.meta["tracks"] + + elif self.client.source == "soundcloud": + self.name = self.meta["title"] + # self.image = self.meta.get("artwork_url").replace("large", "t500x500") + self.creator = self.meta["user"]["username"] + tracklist = self.meta["tracks"] + + else: + raise NotImplementedError + + self.tracktotal = len(tracklist) + if self.client.source == "soundcloud": + # No meta is included in soundcloud playlist + # response, so it is loaded at download time + for track in tracklist: + self.append(Track(self.client, id=track["id"])) + else: + for track in tracklist: + # TODO: This should be managed with .m3u files and alike. Arbitrary + # tracknumber tags might cause conflicts if the playlist files are + # inside of a library folder + meta = TrackMetadata(track=track, source=self.client.source) + cover_url = get_cover_urls(track["album"], self.client.source)[ + kwargs.get("embed_cover_size", "large") + ] + + self.append( + Track( + self.client, + id=track.get("id"), + meta=meta, + cover_url=cover_url, + part_of_tracklist=True, + ) + ) + + logger.debug(f"Loaded {len(self)} tracks from playlist {self.name}") + + def _prepare_download(self, parent_folder: str = "StreamripDownloads", **kwargs): + fname = sanitize_filename(self.name) + self.folder = os.path.join(parent_folder, fname) + + # Used for safe concurrency with tracknumbers instead of an object + # level that stores an index + self.__indices = iter(range(1, len(self) + 1)) + self.download_message() + + def _download_item(self, item: Media, **kwargs): + assert isinstance(item, Track) + + kwargs["parent_folder"] = self.folder + if self.client.source == "soundcloud": + item.load_meta() + click.secho(f"Downloading {item!s}", fg="blue") + + if playlist_to_album := kwargs.get("set_playlist_to_album", False): + item.meta.album = self.name + item.meta.albumartist = self.creator + + if kwargs.get("new_tracknumbers", True): + item.meta.tracknumber = next(self.__indices) + item.meta.discnumber = 1 + + item.download(**kwargs) + + if self.client.source != "deezer": + item.tag(embed_cover=kwargs.get("embed_cover", True)) + + if playlist_to_album and self.client.source == "deezer": + # Because Deezer tracks come pre-tagged, the `set_playlist_to_album` + # option is never set. Here, we manually do this + from mutagen.flac import FLAC + + audio = FLAC(item.path) + audio["ALBUM"] = self.name + audio["ALBUMARTIST"] = self.creator + audio["TRACKNUMBER"] = f"{item['tracknumber']:02}" + audio.save() + + @staticmethod + def _parse_get_resp(item: dict, client: Client) -> dict: + """Parse information from a search result returned by a client.search call. + + :param item: + :type item: dict + :param client: + :type client: Client + """ + if client.source == "qobuz": + return { + "name": item["name"], + "id": item["id"], + } + elif client.source == "tidal": + return { + "name": item["title"], + "id": item["uuid"], + } + elif client.source == "deezer": + return { + "name": item["title"], + "id": item["id"], + } + elif client.source == "soundcloud": + return { + "name": item["title"], + "id": item["permalink_url"], + "description": item["description"], + "popularity": f"{item['likes_count']} likes", + "tracktotal": len(item["tracks"]), + } + + raise InvalidSourceError(client.source) + + @property + def title(self) -> str: + """Get the title. + + :rtype: str + """ + return self.name + + def __repr__(self) -> str: + """Return a string representation of this Playlist object. + + :rtype: str + """ + return f"" + + def __str__(self) -> str: + """Return a readable string representation of this track. + + :rtype: str + """ + return f"{self.name} ({len(self)} tracks)" + + +class Artist(Tracklist): + """Represents a downloadable artist. + + Usage: + >>> resp = client.get('fleetwood mac', 'artist') + >>> artist = Artist.from_api(resp['items'][0], client) + >>> artist.load_meta() + >>> artist.download() + """ + + def __init__(self, client: Client, **kwargs): + """Create a new Artist object. + + :param client: a qopy client instance + :param album_id: artist id returned by qobuz api + :type album_id: Union[str, int] + :param kwargs: + """ + self.client = client + + for k, v in kwargs.items(): + setattr(self, k, v) + + # to improve from_api method speed + if kwargs.get("load_on_init"): + self.load_meta() + + self.loaded = False + + def load_meta(self, **kwargs): + """Send an API call to get album info based on id.""" + self.meta = self.client.get(self.id, media_type="artist") + self._load_albums() + self.loaded = True + + # override + def download(self, **kwargs): + """Download all items in self. + + :param kwargs: + """ + iterator = self._prepare_download(**kwargs) + for item in iterator: + self._download_item(item, **kwargs) + + def _load_albums(self): + """Load Album objects to self. + + This parses the response of client.get(query, 'artist') responses. + """ + if self.client.source == "qobuz": + self.name = self.meta["name"] + albums = self.meta["albums"]["items"] + + elif self.client.source == "tidal": + self.name = self.meta["name"] + albums = self.meta["albums"] + + elif self.client.source == "deezer": + self.name = self.meta["name"] + albums = self.meta["albums"] + + else: + raise InvalidSourceError(self.client.source) + + for album in albums: + logger.debug("Appending album: %s", album.get("title")) + self.append(Album.from_api(album, self.client)) + + def _prepare_download( + self, + parent_folder: str = "StreamripDownloads", + filters: tuple = (), + **kwargs, + ) -> Iterable: + """Prepare the download. + + :param parent_folder: + :type parent_folder: str + :param filters: + :type filters: tuple + :param kwargs: + :rtype: Iterable + """ + folder = sanitize_filename(self.name) + self.folder = os.path.join(parent_folder, folder) + + logger.debug("Artist folder: %s", folder) + logger.debug(f"Length of tracklist {len(self)}") + logger.debug(f"Filters: {filters}") + + final: Iterable + if "repeats" in filters: + final = self._remove_repeats(bit_depth=max, sampling_rate=min) + filters = tuple(f for f in filters if f != "repeats") + else: + final = self + + if isinstance(filters, tuple) and self.client.source == "qobuz": + filter_funcs = (getattr(self, f"_{filter_}") for filter_ in filters) + for func in filter_funcs: + final = filter(func, final) + + self.download_message() + return final + + def _download_item(self, item: Media, **kwargs): + """Download an item. + + :param item: + :param parent_folder: + :type parent_folder: str + :param quality: + :type quality: int + :param kwargs: + :rtype: bool + """ + item.load_meta() + + kwargs.pop("parent_folder") + # always an Album + item.download( + parent_folder=self.folder, + **kwargs, ) - if download_youtube_videos: - click.secho("Downloading video stream", fg="blue") - pv = subprocess.Popen( - [ - "youtube-dl", - "-q", - "-o", - os.path.join( - youtube_video_downloads_folder, - "%(title)s.%(container)s", - ), - self.url, - ] - ) - pv.wait() - p.wait() + @property + def title(self) -> str: + """Get the artist name. - def load_meta(self, *args, **kwargs): - """Return None. + Implemented for consistency. - Dummy method. - - :param args: - :param kwargs: + :rtype: str """ - pass + return self.name - def tag(self, *args, **kwargs): - """Return None. + @classmethod + def from_api(cls, item: dict, client: Client, source: str = "qobuz"): + """Create an Artist object from the api response of Qobuz, Tidal, or Deezer. - Dummy method. - - :param args: - :param kwargs: + :param resp: response dict + :type resp: dict + :param source: in ('qobuz', 'deezer', 'tidal') + :type source: str """ - pass + logging.debug("Loading item from API") + info = cls._parse_get_resp(item, client) - def __bool__(self): - return True + # equivalent to Artist(client=client, **info) + return cls(client=client, **info) + + @staticmethod + def _parse_get_resp(item: dict, client: Client) -> dict: + """Parse a result from a client.search call. + + :param item: the item to parse + :type item: dict + :param client: + :type client: Client + """ + if client.source in ("qobuz", "deezer"): + info = { + "name": item.get("name"), + "id": item.get("id"), + } + elif client.source == "tidal": + info = { + "name": item["name"], + "id": item["id"], + } + else: + raise InvalidSourceError(client.source) + + return info + + # ----------- Filters -------------- + + TYPE_REGEXES = { + "remaster": re.compile(r"(?i)(re)?master(ed)?"), + "extra": re.compile( + r"(?i)(anniversary|deluxe|live|collector|demo|expanded|remix)" + ), + } + + def _remove_repeats(self, bit_depth=max, sampling_rate=max) -> Generator: + """Remove the repeated albums from self. + + May remove different versions of the same album. + + :param bit_depth: either max or min functions + :param sampling_rate: either max or min functions + """ + groups: Dict[str, list] = {} + for album in self: + if (t := self.essence(album.title)) not in groups: + groups[t] = [] + groups[t].append(album) + + for group in groups.values(): + assert bit_depth in (min, max) and sampling_rate in (min, max) + best_bd = bit_depth(a["bit_depth"] for a in group) + best_sr = sampling_rate(a["sampling_rate"] for a in group) + for album in group: + if album["bit_depth"] == best_bd and album["sampling_rate"] == best_sr: + yield album + break + + def _non_studio_albums(self, album: Album) -> bool: + """Filter non-studio-albums. + + :param artist: usually self + :param album: the album to check + :type album: Album + :rtype: bool + """ + return ( + album["albumartist"] != "Various Artists" + and self.TYPE_REGEXES["extra"].search(album.title) is None + ) + + def _features(self, album: Album) -> bool: + """Filter features. + + This will download only albums where the requested + artist is the album artist. + + :param artist: usually self + :param album: the album to check + :type album: Album + :rtype: bool + """ + return self["name"] == album["albumartist"] + + def _extras(self, album: Album) -> bool: + """Filter extras. + + :param artist: usually self + :param album: the album to check + :type album: Album + :rtype: bool + """ + return self.TYPE_REGEXES["extra"].search(album.title) is None + + def _non_remasters(self, album: Album) -> bool: + """Filter non remasters. + + :param artist: usually self + :param album: the album to check + :type album: Album + :rtype: bool + """ + return self.TYPE_REGEXES["remaster"].search(album.title) is not None + + def _non_albums(self, album: Album) -> bool: + """Filter releases that are not albums. + + :param artist: usually self + :param album: the album to check + :type album: Album + :rtype: bool + """ + return len(album) > 1 + + # --------- Magic Methods -------- + + def __repr__(self) -> str: + """Return a string representation of this Artist object. + + :rtype: str + """ + return f"" + + def __str__(self) -> str: + """Return a readable string representation of this Artist. + + :rtype: str + """ + return self.name + + def __hash__(self): + """Hash self.""" + return hash(self.id) + + +class Label(Artist): + """Represents a downloadable Label.""" + + def load_meta(self, **kwargs): + """Load metadata given an id.""" + assert self.client.source == "qobuz", "Label source must be qobuz" + + resp = self.client.get(self.id, "label") + self.name = resp["name"] + for album in resp["albums"]["items"]: + self.append(Album.from_api(album, client=self.client)) + + self.loaded = True + + def __repr__(self): + """Return a string representation of the Label.""" + return f"