From b0f6e7b197450a266f49eda019267f8a230b8491 Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 17 Apr 2021 12:53:29 -0700 Subject: [PATCH] Move bases to bases.py, add support for Qobuz booklets; #53 --- streamrip/bases.py | 826 ++++++++++++++++++++++++++++++++++++++++ streamrip/downloader.py | 802 +------------------------------------- streamrip/metadata.py | 5 +- 3 files changed, 842 insertions(+), 791 deletions(-) create mode 100644 streamrip/bases.py diff --git a/streamrip/bases.py b/streamrip/bases.py new file mode 100644 index 0000000..c8519c2 --- /dev/null +++ b/streamrip/bases.py @@ -0,0 +1,826 @@ +"""These are the lower level classes that are handled by Album, Playlist, +and the other objects. They can also be downloaded individually, for example, +as a single track. +""" + +import concurrent.futures +import logging +import os +import re +import shutil +import subprocess +from tempfile import gettempdir +from typing import Any, Union + +import click +import tqdm +from mutagen.flac import FLAC, Picture +from mutagen.id3 import APIC, ID3, ID3NoHeaderError +from mutagen.mp4 import MP4, MP4Cover +from pathvalidate import sanitize_filepath + +from . import converter +from .clients import Client +from .constants import FLAC_MAX_BLOCKSIZE, TRACK_FORMAT +from .exceptions import ( + InvalidQuality, + InvalidSourceError, + NonStreamable, + TooLargeCoverArt, +) +from .metadata import TrackMetadata +from .utils import ( + clean_format, + decho, + decrypt_mqa_file, + ext, + safe_get, + tidal_cover_url, + tqdm_download, +) + +logger = logging.getLogger(__name__) + +TYPE_REGEXES = { + "remaster": re.compile(r"(?i)(re)?master(ed)?"), + "extra": re.compile(r"(?i)(anniversary|deluxe|live|collector|demo|expanded)"), +} + + +class Track: + """Represents a downloadable track. + + Loading metadata as a single track: + >>> t = Track(client, id='20252078') + >>> t.load_meta() # load metadata from api + + Loading metadata as part of an Album: + >>> t = Track.from_album_meta(api_track_dict, client) + + where `api_track_dict` is a track entry in an album tracklist. + + Downloading and tagging: + >>> t.download() + >>> t.tag() + """ + + def __init__(self, client: Client, **kwargs): + """Create a track object. + + The only required parameter is client, but passing at an id is + highly recommended. Every value in kwargs will be set as an attribute + of the object. (TODO: make this safer) + + :param track_id: track id returned by Qobuz API + :type track_id: Optional[Union[str, int]] + :param client: qopy client + :type client: Client + :param meta: TrackMetadata object + :type meta: Optional[TrackMetadata] + :param kwargs: id, filepath_format, meta, quality, folder + """ + self.client = client + self.id = None + self.__dict__.update(kwargs) + + # TODO: remove these + self.container = "FLAC" + self.sampling_rate = 44100 + self.bit_depth = 16 + + self.downloaded = False + self.tagged = False + # TODO: find better solution + for attr in ("quality", "folder", "meta"): + setattr(self, attr, None) + + if isinstance(kwargs.get("meta"), TrackMetadata): + self.meta = kwargs["meta"] + + if (u := kwargs.get("cover_url")) is not None: + logger.debug(f"Cover url: {u}") + self.cover_url = u + + def load_meta(self): + """Send a request to the client to get metadata for this Track.""" + + assert self.id is not None, "id must be set before loading metadata" + + self.resp = self.client.get(self.id, media_type="track") + self.meta = TrackMetadata( + track=self.resp, source=self.client.source + ) # meta dict -> TrackMetadata object + try: + if self.client.source == "qobuz": + self.cover_url = self.resp["album"]["image"]["large"] + elif self.client.source == "tidal": + self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320) + elif self.client.source == "deezer": + self.cover_url = self.resp["album"]["cover_medium"] + elif self.client.source == "soundcloud": + self.cover_url = ( + self.resp["artwork_url"] or self.resp["user"].get("avatar_url") + ).replace("large", "t500x500") + else: + raise InvalidSourceError(self.client.source) + except KeyError: + logger.debug("No cover found") + self.cover_url = None + + def _prepare_download(self, **kwargs): + # args override attributes + self.quality = min(kwargs["quality"], self.client.max_quality) + self.folder = kwargs["parent_folder"] or self.folder + + self.file_format = kwargs.get("track_format", TRACK_FORMAT) + self.folder = sanitize_filepath(self.folder, platform="auto") + self.format_final_path() + + os.makedirs(self.folder, exist_ok=True) + + if self.id in kwargs.get("database", []): + self.downloaded = True + self.tagged = True + self.path = self.final_path + decho( + f"{self['title']} already logged in database, skipping.", + fg="magenta", + ) + return False # because the track was not downloaded + + if os.path.isfile(self.final_path): # track already exists + self.downloaded = True + self.tagged = True + self.path = self.final_path + decho(f"Track already exists: {self.final_path}", fg="magenta") + return False + + self.download_cover() # only downloads for playlists and singles + self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp") + return True + + def download( + self, + quality: int = 3, + parent_folder: str = "StreamripDownloads", + progress_bar: bool = True, + **kwargs, + ) -> bool: + """ + Download the track. + + :param quality: (0, 1, 2, 3, 4) + :type quality: int + :param folder: folder to download the files to + :type folder: Optional[Union[str, os.PathLike]] + :param progress_bar: turn on/off progress bar + :type progress_bar: bool + """ + if not self._prepare_download( + quality=quality, + parent_folder=parent_folder, + progress_bar=progress_bar, + **kwargs, + ): + return False + + if self.client.source == "soundcloud": + # soundcloud client needs whole dict to get file url + url_id = self.resp + else: + url_id = self.id + + try: + dl_info = self.client.get_file_url(url_id, self.quality) + except Exception as e: + click.secho(f"Unable to download track. {e}", fg="red") + return False + + if self.client.source == "qobuz": + if not self.__validate_qobuz_dl_info(dl_info): + click.secho("Track is not available for download", fg="red") + return False + + self.sampling_rate = dl_info.get("sampling_rate") + self.bit_depth = dl_info.get("bit_depth") + + # --------- Download Track ---------- + if self.client.source in ("qobuz", "tidal", "deezer"): + logger.debug("Downloadable URL found: %s", dl_info.get("url")) + try: + tqdm_download( + dl_info["url"], self.path, desc=self._progress_desc + ) # downloads file + except NonStreamable: + click.secho( + "Track {self!s} is not available for download, skipping.", fg="red" + ) + return False + + elif self.client.source == "soundcloud": + self._soundcloud_download(dl_info, self.path) + + else: + raise InvalidSourceError(self.client.source) + + if ( + self.client.source == "tidal" + and isinstance(dl_info, dict) + and dl_info.get("enc_key", False) + ): + out_path = f"{self.path}_dec" + decrypt_mqa_file(self.path, out_path, dl_info["enc_key"]) + self.path = out_path + + if not kwargs.get("stay_temp", False): + self.move(self.final_path) + + try: + database = kwargs.get("database") + database.add(self.id) + logger.debug(f"{self.id} added to database") + except AttributeError: # assume database=None was passed + pass + + logger.debug("Downloaded: %s -> %s", self.path, self.final_path) + + self.downloaded = True + + if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"): + os.remove(self.cover_path) + + return True + + def __validate_qobuz_dl_info(self, info: dict) -> bool: + return all( + (info.get("sampling_rate"), info.get("bit_depth"), not info.get("sample")) + ) + + def move(self, path: str): + os.makedirs(os.path.dirname(path), exist_ok=True) + shutil.move(self.path, path) + self.path = path + + def _soundcloud_download(self, dl_info: dict) -> str: + if dl_info["type"] == "mp3": + self.path += ".mp3" + # convert hls stream to mp3 + subprocess.call( + [ + "ffmpeg", + "-i", + dl_info["url"], + "-c", + "copy", + "-y", + self.path, + "-loglevel", + "fatal", + ] + ) + elif dl_info["type"] == "original": + tqdm_download(dl_info["url"], self.path, desc=self._progress_desc) + + # if a wav is returned, convert to flac + engine = converter.FLAC(self.path) + self.path = f"{self.path}.flac" + engine.convert(custom_fn=self.path) + + self.final_path = self.final_path.replace(".mp3", ".flac") + self.quality = 2 + + @property + def _progress_desc(self): + return click.style(f"Track {int(self.meta.tracknumber):02}", fg="blue") + + def download_cover(self): + """Downloads the cover art, if cover_url is given.""" + + if not hasattr(self, "cover_url"): + return False + + self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg") + logger.debug(f"Downloading cover from {self.cover_url}") + # click.secho(f"\nDownloading cover art for {self!s}", fg="blue") + + if not os.path.exists(self.cover_path): + tqdm_download( + self.cover_url, self.cover_path, desc=click.style("Cover", fg="cyan") + ) + else: + logger.debug("Cover already exists, skipping download") + + def format_final_path(self) -> str: + """Return the final filepath of the downloaded file. + + This uses the `get_formatter` method of TrackMetadata, which returns + a dict with the keys allowed in formatter strings, and their values in + the TrackMetadata object. + """ + formatter = self.meta.get_formatter() + logger.debug("Track meta formatter %s", formatter) + filename = clean_format(self.file_format, formatter) + self.final_path = os.path.join(self.folder, filename)[:250].strip() + ext( + self.quality, self.client.source + ) + + logger.debug("Formatted path: %s", self.final_path) + + return self.final_path + + @classmethod + def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client): + """Return a new Track object initialized with info from the album dicts + returned by client.get calls. + + :param album: album metadata returned by API + :param pos: index of the track + :param client: qopy client object + :type client: Client + :raises IndexError + """ + + meta = TrackMetadata(album=album, track=track, source=client.source) + return cls(client=client, meta=meta, id=track["id"]) + + @classmethod + def from_api(cls, item: dict, client: Client): + meta = TrackMetadata(track=item, source=client.source) + try: + if client.source == "qobuz": + cover_url = item["album"]["image"]["small"] + elif client.source == "tidal": + cover_url = tidal_cover_url(item["album"]["cover"], 320) + elif client.source == "deezer": + cover_url = item["album"]["cover_medium"] + else: + raise InvalidSourceError(client.source) + except KeyError: + logger.debug("No cover found") + cover_url = None + + return cls( + client=client, + meta=meta, + id=item["id"], + cover_url=cover_url, + ) + + def tag( + self, + album_meta: dict = None, + cover: Union[Picture, APIC, MP4Cover] = None, + embed_cover: bool = True, + ): + """Tag the track using the stored metadata. + + The info stored in the TrackMetadata object (self.meta) can be updated + with album metadata if necessary. The cover must be a mutagen cover-type + object that already has the bytes loaded. + + :param album_meta: album metadata to update Track with + :type album_meta: dict + :param cover: initialized mutagen cover object + :type cover: Union[Picture, APIC] + :param embed_cover: Embed cover art into file + :type embed_cover: bool + """ + assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata" + if not self.downloaded: + logger.info( + "Track %s not tagged because it was not downloaded", self["title"] + ) + return + + if self.tagged: + logger.info( + "Track %s not tagged because it is already tagged", self["title"] + ) + return + + if album_meta is not None: + self.meta.add_album_meta(album_meta) # extend meta with album info + + if self.quality in (2, 3, 4): + self.container = "FLAC" + logger.debug("Tagging file with %s container", self.container) + audio = FLAC(self.path) + elif self.quality <= 1: + if self.client.source == "tidal": + self.container = "AAC" + audio = MP4(self.path) + else: + self.container = "MP3" + try: + audio = ID3(self.path) + except ID3NoHeaderError: + audio = ID3() + + logger.debug("Tagging file with %s container", self.container) + else: + raise InvalidQuality(f'Invalid quality: "{self.quality}"') + + # automatically generate key, value pairs based on container + tags = self.meta.tags(self.container) + for k, v in tags: + audio[k] = v + + if embed_cover and cover is None: + assert hasattr(self, "cover_path") + cover = Tracklist.get_cover_obj( + self.cover_path, self.quality, self.client.source + ) + + if isinstance(audio, FLAC): + if embed_cover: + audio.add_picture(cover) + audio.save() + elif isinstance(audio, ID3): + if embed_cover: + audio.add(cover) + audio.save(self.path, "v2_version=3") + elif isinstance(audio, MP4): + audio["covr"] = [cover] + audio.save() + else: + raise ValueError(f"Unknown container type: {audio}") + + self.tagged = True + + def convert(self, codec: str = "ALAC", **kwargs): + """Converts the track to another codec. + + Valid values for codec: + * FLAC + * ALAC + * MP3 + * OPUS + * OGG + * VORBIS + * AAC + * M4A + + :param codec: the codec to convert the track to + :type codec: str + :param kwargs: + """ + if not self.downloaded: + logger.debug("Track not downloaded, skipping conversion") + click.secho("Track not downloaded, skipping conversion", fg="magenta") + return + + CONV_CLASS = { + "FLAC": converter.FLAC, + "ALAC": converter.ALAC, + "MP3": converter.LAME, + "OPUS": converter.OPUS, + "OGG": converter.Vorbis, + "VORBIS": converter.Vorbis, + "AAC": converter.AAC, + "M4A": converter.AAC, + } + + self.container = codec.upper() + if not hasattr(self, "final_path"): + self.format_final_path() + + if not os.path.isfile(self.path): + logger.info("File %s does not exist. Skipping conversion.", self.path) + click.secho(f"{self!s} does not exist. Skipping conversion.", fg="red") + return + + assert ( + self.container in CONV_CLASS + ), f"Invalid codec {codec}. Must be in {CONV_CLASS.keys()}" + + engine = CONV_CLASS[self.container]( + filename=self.path, + sampling_rate=kwargs.get("sampling_rate"), + remove_source=kwargs.get("remove_source", True), + ) + # click.secho(f"Converting {self!s}", fg="blue") + engine.convert() + self.path = engine.final_fn + self.final_path = self.final_path.replace( + ext(self.quality, self.client.source), f".{engine.container}" + ) + + if not kwargs.get("stay_temp", False): + self.move(self.final_path) + + @property + def title(self) -> str: + if hasattr(self, "meta"): + _title = self.meta.title + if self.meta.explicit: + _title = f"{_title} (Explicit)" + return _title + else: + raise Exception("Track must be loaded before accessing title") + + def get(self, *keys, default=None) -> Any: + """Safe get method that allows for layered access. + + :param keys: + :param default: + """ + return safe_get(self.meta, *keys, default=default) + + def set(self, key, val): + """Equivalent to __setitem__. Implemented only for + consistency. + + :param key: + :param val: + """ + self.__setitem__(key, val) + + def __getitem__(self, key: str) -> Any: + """Dict-like interface for Track metadata. + + :param key: + """ + return getattr(self.meta, key) + + def __setitem__(self, key: str, val: Any): + """Dict-like interface for Track metadata. + + :param key: + :param val: + """ + setattr(self.meta, key, val) + + def __repr__(self) -> str: + """Return a string representation of the track. + + :rtype: str + """ + return f"" + + def __str__(self) -> str: + """Return a readable string representation of + this track. + + :rtype: str + """ + return f"{self['artist']} - {self['title']}" + + +class Video: + """Only for Tidal.""" + + def __init__(self, client: Client, id: str, **kwargs): + self.id = id + self.client = client + self.title = kwargs.get("title", "MusicVideo") + self.explicit = kwargs.get("explicit", False) + self.tracknumber = kwargs.get("tracknumber", None) + + def load_meta(self): + resp = self.client.get(self.id, "video") + self.title = resp["title"] + self.explicit = resp["explicit"] + print(resp) + + def download(self, **kwargs): + click.secho( + f"Downloading {self.title} (Video). This may take a while.", fg="blue" + ) + + self.parent_folder = kwargs.get("parent_folder", "StreamripDownloads") + url = self.client.get_file_url(self.id, video=True) + # it's more convenient to have ffmpeg download the hls + command = ["ffmpeg", "-i", url, "-c", "copy", "-loglevel", "panic", self.path] + p = subprocess.Popen(command) + p.wait() # remove this? + + @classmethod + def from_album_meta(cls, track: dict, client: Client): + return cls( + client, + id=track["id"], + title=track["title"], + explicit=track["explicit"], + tracknumber=track["trackNumber"], + ) + + @property + def path(self) -> str: + os.makedirs(self.parent_folder, exist_ok=True) + fname = self.title + if self.explicit: + fname = f"{fname} (Explicit)" + if self.tracknumber is not None: + fname = f"{self.tracknumber:02}. {fname}" + + return os.path.join(self.parent_folder, f"{fname}.mp4") + + def __str__(self) -> str: + return self.title + + def __repr__(self) -> str: + return f"