diff --git a/streamrip/constants.py b/streamrip/constants.py index 55767ba..8bf1ca5 100644 --- a/streamrip/constants.py +++ b/streamrip/constants.py @@ -144,6 +144,17 @@ LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+" TIDAL_MAX_Q = 7 + +TIDAL_Q_MAP = { + "LOW": 0, + "HIGH": 1, + "LOSSLESS": 2, + "HI_RES": 3, +} + DEEZER_MAX_Q = 6 AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4) MEDIA_TYPES = ("track", "album", "artist", "label", "playlist") + +# used to homogenize cover size keys +COVER_SIZES = ("thumbnail", "small", "large", "original") diff --git a/streamrip/downloader.py b/streamrip/downloader.py index 47dec54..d96dfc7 100644 --- a/streamrip/downloader.py +++ b/streamrip/downloader.py @@ -2,13 +2,13 @@ downloadable form. """ +import concurrent.futures import logging import os import re import shutil import subprocess -import concurrent.futures -from pprint import pformat +from pprint import pformat, pprint from tempfile import gettempdir from typing import Any, Generator, Iterable, Union @@ -23,6 +23,7 @@ from . import converter from .clients import ClientInterface from .constants import ( ALBUM_KEYS, + COVER_SIZES, FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT, @@ -48,15 +49,6 @@ from .utils import ( logger = logging.getLogger(__name__) urllib3.disable_warnings() -TIDAL_Q_MAP = { - "LOW": 0, - "HIGH": 1, - "LOSSLESS": 2, - "HI_RES": 3, -} - -# used to homogenize cover size keys -COVER_SIZES = ("thumbnail", "small", "large", "original") TYPE_REGEXES = { "remaster": re.compile(r"(?i)(re)?master(ed)?"), @@ -606,6 +598,7 @@ class Tracklist(list): the tracklist. """ + # anything not in parentheses or brackets essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*") def download(self, **kwargs): @@ -623,7 +616,7 @@ class Tracklist(list): if kwargs.get("concurrent_downloads", True): # Tidal errors out with unlimited concurrency - max_workers = 15 if self.client.source == 'tidal' else None + max_workers = 15 if self.client.source == "tidal" else None with concurrent.futures.ThreadPoolExecutor(max_workers) as executor: futures = [executor.submit(target, item, **kwargs) for item in self] try: @@ -658,7 +651,7 @@ class Tracklist(list): if isinstance(key, int): if 0 <= key < len(self): - return super().__getitem__(key) + return self[key] return default @@ -915,97 +908,7 @@ class Album(Tracklist): :type resp: dict :rtype: dict """ - if client.source == "qobuz": - if resp.get("maximum_sampling_rate", False): - sampling_rate = resp["maximum_sampling_rate"] * 1000 - else: - sampling_rate = None - - resp["image"]["original"] = resp["image"]["large"].replace("600", "org") - - # TODO: combine these with TrackMetadata objects - return { - "id": resp.get("id"), - "title": resp.get("title"), - "_artist": resp.get("artist") or resp.get("performer"), - "albumartist": safe_get(resp, "artist", "name"), - "year": str(resp.get("release_date_original"))[:4], - "version": resp.get("version"), - "composer": safe_get(resp, "composer", "name"), - "release_type": resp.get("release_type", "album"), - "cover_urls": resp.get("image"), - "streamable": resp.get("streamable"), - "genre": safe_get(resp, 'genre', 'name'), - "quality": get_quality_id( - resp.get("maximum_bit_depth"), resp.get("maximum_sampling_rate") - ), - "bit_depth": resp.get("maximum_bit_depth"), - "sampling_rate": sampling_rate, - "tracktotal": resp.get("tracks_count"), - "description": resp.get("description"), - "disctotal": max( - track.get("media_number", 1) - for track in safe_get(resp, "tracks", "items", default=[{}]) - ) - or 1, - "explicit": resp.get("parental_warning", False), - } - elif client.source == "tidal": - return { - "id": resp.get("id"), - "title": resp.get("title"), - "_artist": safe_get(resp, "artist", "name"), - "albumartist": safe_get(resp, "artist", "name"), - "year": resp.get("releaseDate")[:4], - "version": resp.get("version"), - "cover_urls": { - size: tidal_cover_url(resp.get("cover"), x) - for size, x in zip(COVER_SIZES, (160, 320, 640, 1280)) - }, - "streamable": resp.get("allowStreaming"), - "quality": TIDAL_Q_MAP[resp.get("audioQuality")], - "bit_depth": 24 if resp.get("audioQuality") == "HI_RES" else 16, - "sampling_rate": 48000 - if resp.get("audioQuality") == "HI_RES" - else 41000, - "tracktotal": resp.get("numberOfTracks"), - "disctotal": resp.get("numberOfVolumes"), - "explicit": resp.get("explicit", False), - } - elif client.source == "deezer": - if resp.get("release_date", False): - year = resp["release_date"][:4] - else: - year = None - - return { - "id": resp.get("id"), - "title": resp.get("title"), - "_artist": safe_get(resp, "artist", "name"), - "albumartist": safe_get(resp, "artist", "name"), - "year": year, - # version not given by API - "cover_urls": { - sk: resp.get(rk) # size key, resp key - for sk, rk in zip( - COVER_SIZES, - ("cover", "cover_medium", "cover_large", "cover_xl"), - ) - }, - "url": resp.get("link"), - "streamable": True, # api only returns streamables - "quality": 2, # all tracks are 16/44.1 streamable - "bit_depth": 16, - "sampling_rate": 44100, - "tracktotal": resp.get("track_total") or resp.get("nb_tracks"), - "disctotal": max( - track.get("disk_number") for track in resp.get("tracks", [{}]) - ) - or 1, - "explicit": bool(resp.get("explicit_content_lyrics")), - } - - raise InvalidSourceError(client.source) + return TrackMetadata(album=resp, source=client.source).asdict() def _load_tracks(self): """Given an album metadata dict returned by the API, append all of its diff --git a/streamrip/metadata.py b/streamrip/metadata.py index ad0d609..39fff33 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -1,20 +1,22 @@ """Manages the information that will be embeded in the audio file. """ -import json import logging import re -from typing import Generator, Optional, Tuple, Union +from functools import cache +from typing import Generator, Hashable, Optional, Tuple, Union from .constants import ( COPYRIGHT, + COVER_SIZES, FLAC_KEY, MP3_KEY, MP4_KEY, PHON_COPYRIGHT, + TIDAL_Q_MAP, TRACK_KEYS, ) -from .exceptions import InvalidContainerError -from .utils import safe_get +from .exceptions import InvalidContainerError, InvalidSourceError +from .utils import get_quality_id, safe_get logger = logging.getLogger(__name__) @@ -57,6 +59,7 @@ class TrackMetadata: :param album: album dict from API :type album: Optional[dict] """ + # embedded information self.title = None self.album = None self.albumartist = None @@ -75,12 +78,21 @@ class TrackMetadata: # not included in tags self.explicit = False + self.quality = None + self.sampling_rate = None + self.bit_depth = None + + # Internals + self._artist = None + self._copyright = None + self._genres = None self.__source = source - if track is None and album is None: - logger.debug("No params passed, returning") - return + if isinstance(track, TrackMetadata): + self.update(track) + if isinstance(album, TrackMetadata): + self.update(album) if track is not None: self.add_track_meta(track) @@ -90,20 +102,19 @@ class TrackMetadata: def add_album_meta(self, resp: dict): """Parse the metadata from an resp dict returned by the - Qobuz API. + API. :param dict resp: from API """ if self.__source == "qobuz": + # Tags self.album = resp.get("title") self.tracktotal = resp.get("tracks_count", 1) self.genre = resp.get("genres_list", []) self.date = resp.get("release_date_original") or resp.get("release_date") - if self.date: - self.year = self.date[:4] - self.copyright = resp.get("copyright") self.albumartist = safe_get(resp, "artist", "name") + self.composer = safe_get(resp, "composer", "name") self.label = resp.get("label") self.description = resp.get("description") self.disctotal = ( @@ -118,35 +129,72 @@ class TrackMetadata: if isinstance(self.label, dict): self.label = self.label.get("name") + # Non-embedded information + self.version = resp.get("version") + self.cover_urls = resp.get("image") + self.cover_urls["original"] = self.cover_urls["large"].replace("600", "org") + self.streamable = resp.get("streamable", False) + self.bit_depth = resp.get("maximum_bit_depth") + self.sampling_rate = resp.get("maximum_sampling_rate") + self.quality = get_quality_id(self.bit_depth, self.sampling_rate) + + if self.sampling_rate is not None: + self.sampling_rate *= 1000 + elif self.__source == "tidal": self.album = resp.get("title") self.tracktotal = resp.get("numberOfTracks") # genre not returned by API self.date = resp.get("releaseDate") - if self.date: - self.year = self.date[:4] self.copyright = resp.get("copyright") self.albumartist = safe_get(resp, "artist", "name") self.disctotal = resp.get("numberOfVolumes") self.isrc = resp.get("isrc") - self.explicit = resp.get("explicit", False) # label not returned by API + # non-embedded + self.explicit = resp.get("explicit", False) + self.cover_urls = { + sk: resp.get(rk) # size key, resp key + for sk, rk in zip( + COVER_SIZES, + ("cover", "cover_medium", "cover_large", "cover_xl"), + ) + } + self.streamable = resp.get("allowStreaming", False) + self.quality = TIDAL_Q_MAP[resp["audioQuality"]] + elif self.__source == "deezer": self.album = resp.get("title") - self.tracktotal = resp.get("track_total") + self.tracktotal = resp.get("track_total") or resp.get("nb_tracks") + self.disctotal = ( + max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1 + ) self.genre = safe_get(resp, "genres", "data") self.date = resp.get("release_date") self.albumartist = safe_get(resp, "artist", "name") self.label = resp.get("label") - # either 0 or 1 + self.url = resp.get("link") + + # not embedded self.explicit = bool(resp.get("parental_warning")) + self.quality = 2 + self.bit_depth = 16 + self.cover_urls = { + sk: resp.get(rk) # size key, resp key + for sk, rk in zip( + COVER_SIZES, + ("cover", "cover_medium", "cover_large", "cover_xl"), + ) + } + self.sampling_rate = 44100 + self.streamable = True elif self.__source == "soundcloud": raise NotImplementedError else: - raise ValueError(self.__source) + raise InvalidSourceError(self.__source) def add_track_meta(self, track: dict): """Parse the metadata from a track dict returned by an @@ -230,8 +278,8 @@ class TrackMetadata: @property def genre(self) -> Union[str, None]: """Formats the genre list returned by the Qobuz API. - >>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé'] - >>> _format_genres(g) + >>> meta.genre = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé'] + >>> meta.genre 'Pop, Rock, Alternatif et Indé' :rtype: str @@ -240,14 +288,11 @@ class TrackMetadata: return None if isinstance(self._genres, list): - genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) - no_repeats = [] + if self.__source == "qobuz": + genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) + genres = set(genres) - for genre in genres: - if genre not in no_repeats: - no_repeats.append(genre) - - return ", ".join(no_repeats) + return ", ".join(genres) elif isinstance(self._genres, str): return self._genres @@ -400,6 +445,9 @@ class TrackMetadata: if v is not None and text is not None: yield (v, text) + def asdict(self) -> dict: + return {k: getattr(self, k) for k in dir(self) if not k.startswith("_")} + def __setitem__(self, key, val): """Dict-like access for tags. @@ -441,10 +489,13 @@ class TrackMetadata: """ return self.__setitem__(key, val) + def __hash__(self) -> int: + return sum(hash(v) for v in self.asdict().values() if isinstance(v, Hashable)) + def __repr__(self) -> str: """Returns the string representation of the metadata object. :rtype: str """ # TODO: make a more readable repr - return json.dumps(self.__dict__, indent=2) + return f""