From e6a5d2cd393e8c2a4009214a5338bed08ce524fe Mon Sep 17 00:00:00 2001 From: nathom Date: Wed, 28 Apr 2021 00:24:17 -0700 Subject: [PATCH 01/14] Start comprehensive typing --- .mypy.ini | 20 +++++++++ streamrip/bases.py | 27 ++++++------ streamrip/clients.py | 32 +++++++------- streamrip/config.py | 7 +-- streamrip/converter.py | 10 ++--- streamrip/core.py | 78 ++++++++++++++++++++------------- streamrip/metadata.py | 96 +++++++++++++++++++++-------------------- streamrip/tracklists.py | 2 +- 8 files changed, 157 insertions(+), 115 deletions(-) create mode 100644 .mypy.ini diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..33356af --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,20 @@ +[mypy-mutagen.*] +ignore_missing_imports = True + +[mypy-tqdm.*] +ignore_missing_imports = True + +[mypy-pathvalidate.*] +ignore_missing_imports = True + +[mypy-packaging.*] +ignore_missing_imports = True + +[mypy-ruamel.yaml.*] +ignore_missing_imports = True + +[mypy-pick.*] +ignore_missing_imports = True + +[mypy-simple_term_menu.*] +ignore_missing_imports = True diff --git a/streamrip/bases.py b/streamrip/bases.py index 8f191da..2330175 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -86,6 +86,10 @@ class Track: self.downloaded = False self.tagged = False self.converted = False + + self.final_path: str + self.container: str + # TODO: find better solution for attr in ("quality", "folder", "meta"): setattr(self, attr, None) @@ -236,12 +240,10 @@ class Track: if not kwargs.get("stay_temp", False): self.move(self.final_path) - try: - database = kwargs.get("database") + database = kwargs.get("database") + if database is not None: database.add(self.id) logger.debug(f"{self.id} added to database") - except AttributeError: # assume database=None was passed - pass logger.debug("Downloaded: %s -> %s", self.path, self.final_path) @@ -273,7 +275,7 @@ class Track: shutil.move(self.path, path) self.path = path - def _soundcloud_download(self, dl_info: dict) -> str: + def _soundcloud_download(self, dl_info: dict): """Downloads a soundcloud track. This requires a seperate function because there are three methods that can be used to download a track: * original file downloads @@ -708,6 +710,9 @@ class Booklet: :param resp: :type resp: dict """ + self.url: str + self.description: str + self.__dict__.update(resp) def download(self, parent_folder: str, **kwargs): @@ -861,9 +866,7 @@ class Tracklist(list): return cls(client=client, **info) @staticmethod - def get_cover_obj( - cover_path: str, container: str, source: str - ) -> Union[Picture, APIC]: + def get_cover_obj(cover_path: str, container: str, source: str): """Given the path to an image and a quality id, return an initialized cover object that can be used for every track in the album. @@ -907,7 +910,7 @@ class Tracklist(list): with open(cover_path, "rb") as img: return cover(img.read(), imageformat=MP4Cover.FORMAT_JPEG) - def download_message(self) -> str: + def download_message(self): """The message to display after calling `Tracklist.download`. :rtype: str @@ -938,14 +941,14 @@ class Tracklist(list): return album - def __getitem__(self, key: Union[str, int]): + def __getitem__(self, key): if isinstance(key, str): return getattr(self, key) if isinstance(key, int): return super().__getitem__(key) - def __setitem__(self, key: Union[str, int], val: Any): + def __setitem__(self, key, val): if isinstance(key, str): setattr(self, key, val) @@ -990,7 +993,7 @@ class YoutubeVideo: ) if download_youtube_videos: - click.secho("Downloading video stream", fg='blue') + click.secho("Downloading video stream", fg="blue") pv = subprocess.Popen( [ "youtube-dl", diff --git a/streamrip/clients.py b/streamrip/clients.py index 7c275c9..4d05e0c 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -13,6 +13,7 @@ from .constants import ( AGENT, AVAILABLE_QUALITY_IDS, DEEZER_BASE, + DEEZER_DL, DEEZER_MAX_Q, QOBUZ_BASE, QOBUZ_FEATURED_KEYS, @@ -43,6 +44,10 @@ class Client(ABC): it is merely a template. """ + source: str + max_quality: int + logged_in: bool + @abstractmethod def login(self, **kwargs): """Authenticate the client. @@ -71,25 +76,13 @@ class Client(ABC): pass @abstractmethod - def get_file_url(self, track_id, quality=3) -> Union[dict]: + def get_file_url(self, track_id, quality=3) -> Union[dict, str]: """Get the direct download url dict for a file. :param track_id: id of the track """ pass - @property - @abstractmethod - def source(self): - """Source from which the Client retrieves data.""" - pass - - @property - @abstractmethod - def max_quality(self): - """The maximum quality that the Client supports.""" - pass - class QobuzClient(Client): source = "qobuz" @@ -99,7 +92,7 @@ class QobuzClient(Client): def __init__(self): self.logged_in = False - def login(self, email: str, pwd: str, **kwargs): + def login(self, **kwargs): """Authenticate the QobuzClient. Must have a paid membership. If `app_id` and `secrets` are not provided, this will run the @@ -113,6 +106,8 @@ class QobuzClient(Client): :param kwargs: app_id: str, secrets: list, return_secrets: bool """ click.secho(f"Logging into {self.source}", fg="green") + email: str = kwargs["email"] + pwd: str = kwargs["pwd"] if self.logged_in: logger.debug("Already logged in") return @@ -184,7 +179,7 @@ class QobuzClient(Client): # ---------- Private Methods --------------- - def _gen_pages(self, epoint: str, params: dict) -> dict: + def _gen_pages(self, epoint: str, params: dict) -> Generator: """When there are multiple pages of results, this lazily yields them. @@ -352,7 +347,7 @@ class QobuzClient(Client): else: raise InvalidAppSecretError("Cannot find app secret") - quality = get_quality(quality, self.source) + quality = int(get_quality(quality, self.source)) r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}" logger.debug("Raw request signature: %s", r_sig) r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest() @@ -857,7 +852,7 @@ class SoundCloudClient(Client): return resp - def get_file_url(self, track: dict, quality) -> dict: + def get_file_url(self, track, quality): """Get the streamable file url from soundcloud. It will most likely be an hls stream, which will have to be manually @@ -868,6 +863,9 @@ class SoundCloudClient(Client): :param quality: :rtype: dict """ + # TODO: find better solution for typing + assert isinstance(track, dict) + if not track["streamable"] or track["policy"] == "BLOCK": raise Exception diff --git a/streamrip/config.py b/streamrip/config.py index 86fe08e..a454d3d 100644 --- a/streamrip/config.py +++ b/streamrip/config.py @@ -5,6 +5,7 @@ import os import re from functools import cache from pprint import pformat +from typing import Any, Dict from ruamel.yaml import YAML @@ -45,7 +46,7 @@ class Config: values. """ - defaults = { + defaults: Dict[str, Any] = { "qobuz": { "quality": 3, "download_booklets": True, @@ -107,8 +108,8 @@ class Config: def __init__(self, path: str = None): # to access settings loaded from yaml file - self.file = copy.deepcopy(self.defaults) - self.session = copy.deepcopy(self.defaults) + self.file: Dict[str, Any] = copy.deepcopy(self.defaults) + self.session: Dict[str, Any] = copy.deepcopy(self.defaults) if path is None: self._path = CONFIG_PATH diff --git a/streamrip/converter.py b/streamrip/converter.py index ff1a8d1..52ecde8 100644 --- a/streamrip/converter.py +++ b/streamrip/converter.py @@ -15,11 +15,11 @@ SAMPLING_RATES = (44100, 48000, 88200, 96000, 176400, 192000) class Converter: """Base class for audio codecs.""" - codec_name = None - codec_lib = None - container = None - lossless = False - default_ffmpeg_arg = "" + codec_name: str + codec_lib: str + container: str + lossless: bool = False + default_ffmpeg_arg: str = "" def __init__( self, diff --git a/streamrip/core.py b/streamrip/core.py index 8075bed..924e6af 100644 --- a/streamrip/core.py +++ b/streamrip/core.py @@ -6,7 +6,7 @@ import sys from getpass import getpass from hashlib import md5 from string import Formatter -from typing import Generator, Optional, Tuple, Union +from typing import Dict, Generator, List, Optional, Tuple, Type, Union import click import requests @@ -19,11 +19,11 @@ from .constants import ( CONFIG_PATH, DB_PATH, LASTFM_URL_REGEX, - YOUTUBE_URL_REGEX, MEDIA_TYPES, QOBUZ_INTERPRETER_URL_REGEX, SOUNDCLOUD_URL_REGEX, URL_REGEX, + YOUTUBE_URL_REGEX, ) from .db import MusicDB from .exceptions import ( @@ -38,7 +38,10 @@ from .utils import extract_interpreter_url logger = logging.getLogger(__name__) -MEDIA_CLASS = { +Media = Union[ + Type[Album], Type[Playlist], Type[Artist], Type[Track], Type[Label], Type[Video] +] +MEDIA_CLASS: Dict[str, Media] = { "album": Album, "playlist": Playlist, "artist": Artist, @@ -46,7 +49,6 @@ MEDIA_CLASS = { "label": Label, "video": Video, } -Media = Union[Album, Playlist, Artist, Track] class MusicDL(list): @@ -61,9 +63,11 @@ class MusicDL(list): self.interpreter_url_parse = re.compile(QOBUZ_INTERPRETER_URL_REGEX) self.youtube_url_parse = re.compile(YOUTUBE_URL_REGEX) - self.config = config - if self.config is None: + self.config: Config + if config is None: self.config = Config(CONFIG_PATH) + else: + self.config = config self.clients = { "qobuz": QobuzClient(), @@ -72,13 +76,14 @@ class MusicDL(list): "soundcloud": SoundCloudClient(), } - if config.session["database"]["enabled"]: - if config.session["database"]["path"] is not None: - self.db = MusicDB(config.session["database"]["path"]) + self.db: Union[MusicDB, list] + if self.config.session["database"]["enabled"]: + if self.config.session["database"]["path"] is not None: + self.db = MusicDB(self.config.session["database"]["path"]) else: self.db = MusicDB(DB_PATH) - config.file["database"]["path"] = DB_PATH - config.save() + self.config.file["database"]["path"] = DB_PATH + self.config.save() else: self.db = [] @@ -175,7 +180,7 @@ class MusicDL(list): ) click.secho("rip config --reset ", fg="yellow", nl=False) click.secho("to reset it. You will need to log in again.", fg="red") - click.secho(err, fg='red') + click.secho(err, fg="red") exit() logger.debug("Arguments from config: %s", arguments) @@ -247,7 +252,7 @@ class MusicDL(list): self.config.file["tidal"].update(client.get_tokens()) self.config.save() - def parse_urls(self, url: str) -> Tuple[str, str]: + def parse_urls(self, url: str) -> List[Tuple[str, str, str]]: """Returns the type of the url and the id. Compatible with urls of the form: @@ -262,7 +267,7 @@ class MusicDL(list): :raises exceptions.ParsingError """ - parsed = [] + parsed: List[Tuple[str, str, str]] = [] interpreter_urls = self.interpreter_url_parse.findall(url) if interpreter_urls: @@ -291,14 +296,15 @@ class MusicDL(list): return parsed def handle_lastfm_urls(self, urls): + # For testing: # https://www.last.fm/user/nathan3895/playlists/12058911 user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+") lastfm_urls = self.lastfm_url_parse.findall(urls) lastfm_source = self.config.session["lastfm"]["source"] - tracks_not_found = 0 - def search_query(query: str, playlist: Playlist): - global tracks_not_found + def search_query(query: str, playlist: Playlist) -> bool: + """Search for a query and add the first result to the given + Playlist object.""" try: track = next(self.search(lastfm_source, query, media_type="track")) if self.config.session["metadata"]["set_playlist_to_album"]: @@ -307,29 +313,33 @@ class MusicDL(list): track.meta.version = track.meta.work = None playlist.append(track) + return True except NoResultsFound: - tracks_not_found += 1 - return + return False for purl in lastfm_urls: click.secho(f"Fetching playlist at {purl}", fg="blue") title, queries = self.get_lastfm_playlist(purl) pl = Playlist(client=self.get_client(lastfm_source), name=title) - pl.creator = user_regex.search(purl).group(1) + creator_match = user_regex.search(purl) + if creator_match is not None: + pl.creator = creator_match.group(1) + tracks_not_found: int = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: futures = [ executor.submit(search_query, f"{title} {artist}", pl) for title, artist in queries ] # only for the progress bar - for f in tqdm( + for search_attempt in tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc="Searching", ): - pass + if not search_attempt.result(): + tracks_not_found += 1 pl.loaded = True click.secho(f"{tracks_not_found} tracks not found.", fg="yellow") @@ -362,7 +372,7 @@ class MusicDL(list): else page["albums"]["items"] ) for item in tracklist: - yield MEDIA_CLASS[ + yield MEDIA_CLASS[ # type: ignore media_type if media_type != "featured" else "album" ].from_api(item, client) i += 1 @@ -376,7 +386,7 @@ class MusicDL(list): raise NoResultsFound(query) for item in items: - yield MEDIA_CLASS[media_type].from_api(item, client) + yield MEDIA_CLASS[media_type].from_api(item, client) # type: ignore i += 1 if i > limit: return @@ -408,7 +418,7 @@ class MusicDL(list): ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields}) return ret - def interactive_search( + def interactive_search( # noqa self, query: str, source: str = "qobuz", media_type: str = "album" ): results = tuple(self.search(source, query, media_type, limit=50)) @@ -506,13 +516,21 @@ class MusicDL(list): r = requests.get(url) get_titles(r.text) - remaining_tracks = ( - int(re.search(r'data-playlisting-entry-count="(\d+)"', r.text).group(1)) - - 50 + remaining_tracks_match = re.search( + r'data-playlisting-entry-count="(\d+)"', r.text ) - playlist_title = re.search( + if remaining_tracks_match is not None: + remaining_tracks = int(remaining_tracks_match.group(1)) - 50 + else: + raise Exception("Error parsing lastfm page") + + playlist_title_match = re.search( r'

([^<]+)

', r.text - ).group(1) + ) + if playlist_title_match is not None: + playlist_title = playlist_title_match.group(1) + else: + raise Exception("Error finding title from response") page = 1 while remaining_tracks > 0: diff --git a/streamrip/metadata.py b/streamrip/metadata.py index be0f5ef..993ad49 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -2,7 +2,7 @@ import logging import re from collections import OrderedDict -from typing import Generator, Hashable, Optional, Tuple, Union +from typing import Generator, Hashable, Iterable, Optional, Union from .constants import ( COPYRIGHT, @@ -59,34 +59,37 @@ class TrackMetadata: :type album: Optional[dict] """ # embedded information - self.title = None - self.album = None - self.albumartist = None - self.composer = None - self.comment = None - self.description = None - self.purchase_date = None - self.grouping = None - self.lyrics = None - self.encoder = None - self.compilation = None - self.cover = None - self.tracktotal = None - self.tracknumber = None - self.discnumber = None - self.disctotal = None + self.title: str + self.album: str + self.albumartist: str + self.composer: str + self.comment: Optional[str] + self.description: Optional[str] + self.purchase_date: Optional[str] + self.grouping: Optional[str] + self.lyrics: Optional[str] + self.encoder: Optional[str] + self.compilation: Optional[str] + self.cover: str + self.tracktotal: int + self.tracknumber: int + self.discnumber: int + self.disctotal: int # not included in tags - self.explicit = False - self.quality = None - self.sampling_rate = None - self.bit_depth = None + self.explicit: Optional[bool] = False + self.quality: Optional[int] = None + self.sampling_rate: Optional[int] = None + self.bit_depth: Optional[int] = None self.booklets = None + self.cover_urls = Optional[OrderedDict] + self.work: Optional[str] + self.id: Optional[str] # Internals - self._artist = None - self._copyright = None - self._genres = None + self._artist: Optional[str] = None + self._copyright: Optional[str] = None + self._genres: Optional[Iterable] = None self.__source = source @@ -121,7 +124,7 @@ class TrackMetadata: """ if self.__source == "qobuz": # Tags - self.album = resp.get("title") + self.album = resp.get("title", "Unknown Album") self.tracktotal = resp.get("tracks_count", 1) self.genre = resp.get("genres_list") or resp.get("genre") self.date = resp.get("release_date_original") or resp.get("release_date") @@ -144,7 +147,7 @@ class TrackMetadata: # Non-embedded information self.version = resp.get("version") - self.cover_urls = OrderedDict(resp.get("image")) + self.cover_urls = OrderedDict(resp["image"]) self.cover_urls["original"] = self.cover_urls["large"].replace("600", "org") self.streamable = resp.get("streamable", False) self.bit_depth = resp.get("maximum_bit_depth") @@ -156,14 +159,14 @@ class TrackMetadata: self.sampling_rate *= 1000 elif self.__source == "tidal": - self.album = resp.get("title") + self.album = resp.get("title", "Unknown Album") self.tracktotal = resp.get("numberOfTracks", 1) # genre not returned by API self.date = resp.get("releaseDate") self.copyright = resp.get("copyright") self.albumartist = safe_get(resp, "artist", "name") - self.disctotal = resp.get("numberOfVolumes") + self.disctotal = resp.get("numberOfVolumes", 1) self.isrc = resp.get("isrc") # label not returned by API @@ -185,8 +188,8 @@ class TrackMetadata: self.sampling_rate = 44100 elif self.__source == "deezer": - self.album = resp.get("title") - self.tracktotal = resp.get("track_total") or resp.get("nb_tracks") + self.album = resp.get("title", "Unknown Album") + self.tracktotal = resp.get("track_total", 0) or resp.get("nb_tracks", 0) self.disctotal = ( max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1 ) @@ -224,7 +227,7 @@ class TrackMetadata: :param track: """ if self.__source == "qobuz": - self.title = track.get("title").strip() + self.title = track["title"].strip() self._mod_title(track.get("version"), track.get("work")) self.composer = track.get("composer", {}).get("name") @@ -235,24 +238,23 @@ class TrackMetadata: self.artist = self.get("albumartist") elif self.__source == "tidal": - self.title = track.get("title").strip() + self.title = track["title"].strip() self._mod_title(track.get("version"), None) self.tracknumber = track.get("trackNumber", 1) - self.discnumber = track.get("volumeNumber") + self.discnumber = track.get("volumeNumber", 1) self.artist = track.get("artist", {}).get("name") elif self.__source == "deezer": - self.title = track.get("title").strip() + self.title = track["title"].strip() self._mod_title(track.get("version"), None) self.tracknumber = track.get("track_position", 1) - self.discnumber = track.get("disk_number") + self.discnumber = track.get("disk_number", 1) self.artist = track.get("artist", {}).get("name") elif self.__source == "soundcloud": self.title = track["title"].strip() self.genre = track["genre"] - self.artist = track["user"]["username"] - self.albumartist = self.artist + self.artist = self.albumartist = track["user"]["username"] self.year = track["created_at"][:4] self.label = track["label_name"] self.description = track["description"] @@ -287,7 +289,7 @@ class TrackMetadata: return album @album.setter - def album(self, val) -> str: + def album(self, val): self._album = val @property @@ -331,7 +333,7 @@ class TrackMetadata: if isinstance(self._genres, list): if self.__source == "qobuz": - genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) + genres: Iterable = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) genres = set(genres) return ", ".join(genres) @@ -342,7 +344,7 @@ class TrackMetadata: raise TypeError(f"Genre must be list or str, not {type(self._genres)}") @genre.setter - def genre(self, val: Union[str, list]): + def genre(self, val: Union[Iterable, dict]): """Sets the internal `genre` field to the given list. It is not formatted until it is requested with `meta.genre`. @@ -352,7 +354,7 @@ class TrackMetadata: self._genres = val @property - def copyright(self) -> Union[str, None]: + def copyright(self) -> Optional[str]: """Formats the copyright string to use nice-looking unicode characters. @@ -361,11 +363,11 @@ class TrackMetadata: if hasattr(self, "_copyright"): if self._copyright is None: return None - copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright) + copyright: str = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright) copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, copyright) return copyright - logger.debug("Accessed copyright tag before setting, return None") + logger.debug("Accessed copyright tag before setting, returning None") return None @copyright.setter @@ -440,7 +442,7 @@ class TrackMetadata: raise InvalidContainerError(f"Invalid container {container}") - def __gen_flac_tags(self) -> Tuple[str, str]: + def __gen_flac_tags(self) -> Generator: """Generate key, value pairs to tag FLAC files. :rtype: Tuple[str, str] @@ -454,7 +456,7 @@ class TrackMetadata: logger.debug("Adding tag %s: %s", v, tag) yield (v, str(tag)) - def __gen_mp3_tags(self) -> Tuple[str, str]: + def __gen_mp3_tags(self) -> Generator: """Generate key, value pairs to tag MP3 files. :rtype: Tuple[str, str] @@ -470,7 +472,7 @@ class TrackMetadata: if text is not None and v is not None: yield (v.__name__, v(encoding=3, text=text)) - def __gen_mp4_tags(self) -> Tuple[str, Union[str, int, tuple]]: + def __gen_mp4_tags(self) -> Generator: """Generate key, value pairs to tag ALAC or AAC files in an MP4 container. @@ -510,7 +512,7 @@ class TrackMetadata: """ return getattr(self, key) - def get(self, key, default=None) -> str: + def get(self, key, default=None): """Returns the requested attribute of the object, with a default value. diff --git a/streamrip/tracklists.py b/streamrip/tracklists.py index 452e38a..2fe6634 100644 --- a/streamrip/tracklists.py +++ b/streamrip/tracklists.py @@ -171,7 +171,7 @@ class Album(Tracklist): return True @staticmethod - def _parse_get_resp(resp: dict, client: Client) -> dict: + def _parse_get_resp(resp: dict, client: Client) -> TrackMetadata: """Parse information from a client.get(query, 'album') call. :param resp: From d6830adcda341f8499762b50c36371c3d56d5b29 Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 1 May 2021 18:05:34 -0700 Subject: [PATCH 02/14] Misc tying fixes --- streamrip/bases.py | 2 +- streamrip/metadata.py | 16 ++++++++-------- streamrip/tracklists.py | 3 +++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index 2330175..89f3c8a 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -241,7 +241,7 @@ class Track: self.move(self.final_path) database = kwargs.get("database") - if database is not None: + if database: database.add(self.id) logger.debug(f"{self.id} added to database") diff --git a/streamrip/metadata.py b/streamrip/metadata.py index 993ad49..0a4e2f5 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -63,14 +63,14 @@ class TrackMetadata: self.album: str self.albumartist: str self.composer: str - self.comment: Optional[str] - self.description: Optional[str] - self.purchase_date: Optional[str] - self.grouping: Optional[str] - self.lyrics: Optional[str] - self.encoder: Optional[str] - self.compilation: Optional[str] - self.cover: str + self.comment: Optional[str] = None + self.description: Optional[str] = None + self.purchase_date: Optional[str] = None + self.grouping: Optional[str] = None + self.lyrics: Optional[str] = None + self.encoder: Optional[str] = None + self.compilation: Optional[str] = None + self.cover: Optional[str] = None self.tracktotal: int self.tracknumber: int self.discnumber: int diff --git a/streamrip/tracklists.py b/streamrip/tracklists.py index 2fe6634..ebcb0b9 100644 --- a/streamrip/tracklists.py +++ b/streamrip/tracklists.py @@ -2,6 +2,8 @@ downloadable form. """ +from __future__ import annotations + import functools import logging import os @@ -561,6 +563,7 @@ class Artist(Tracklist): logger.debug(f"Length of tracklist {len(self)}") logger.debug(f"Filters: {filters}") + final: Iterable if "repeats" in filters: final = self._remove_repeats(bit_depth=max, sampling_rate=min) filters = tuple(f for f in filters if f != "repeats") From 4518944efcb16cfc142bd9bba92ab90b491a914c Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 1 May 2021 18:13:07 -0700 Subject: [PATCH 03/14] Increase Deezer track limit; #63 --- streamrip/clients.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/streamrip/clients.py b/streamrip/clients.py index 4d05e0c..8b52730 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -1,4 +1,5 @@ import base64 +from pprint import pprint import hashlib import json import logging @@ -418,11 +419,11 @@ class DeezerClient(Client): :param limit: :type limit: int """ - # TODO: more robust url sanitize - query = query.replace(" ", "+") # TODO: use limit parameter - response = self.session.get(f"{DEEZER_BASE}/search/{media_type}?q={query}") + response = self.session.get( + f"{DEEZER_BASE}/search/{media_type}", params={"q": query} + ) response.raise_for_status() return response.json() @@ -441,7 +442,7 @@ class DeezerClient(Client): url = f"{DEEZER_BASE}/{media_type}/{meta_id}" item = self.session.get(url).json() if media_type in ("album", "playlist"): - tracks = self.session.get(f"{url}/tracks").json() + tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json() item["tracks"] = tracks["data"] item["track_total"] = len(tracks["data"]) elif media_type == "artist": @@ -452,6 +453,13 @@ class DeezerClient(Client): @staticmethod def get_file_url(meta_id: Union[str, int], quality: int = 6): + """Get downloadable url for a track. + + :param meta_id: The track ID. + :type meta_id: Union[str, int] + :param quality: + :type quality: int + """ quality = min(DEEZER_MAX_Q, quality) url = f"{DEEZER_DL}/{get_quality(quality, 'deezer')}/{DEEZER_BASE}/track/{meta_id}" logger.debug(f"Download url {url}") From a06336c48acd3bbea2d707f1afdef9999d6eb119 Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 1 May 2021 18:17:59 -0700 Subject: [PATCH 04/14] Create subdirectory for Label downloads --- streamrip/tracklists.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streamrip/tracklists.py b/streamrip/tracklists.py index ebcb0b9..9a95095 100644 --- a/streamrip/tracklists.py +++ b/streamrip/tracklists.py @@ -557,7 +557,7 @@ class Artist(Tracklist): self, parent_folder: str = "StreamripDownloads", filters: tuple = (), **kwargs ) -> Iterable: folder = sanitize_filename(self.name) - folder = os.path.join(parent_folder, folder) + self.folder = os.path.join(parent_folder, folder) logger.debug("Artist folder: %s", folder) logger.debug(f"Length of tracklist {len(self)}") @@ -590,11 +590,11 @@ class Artist(Tracklist): item.load_meta() except NonStreamable: logger.info("Skipping album, not available to stream.") - return + return False # always an Album status = item.download( - parent_folder=parent_folder, + parent_folder=self.folder, quality=quality, database=database, **kwargs, From cfa6b35eb01037e16d26b0d573fba16e37011438 Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 1 May 2021 18:38:55 -0700 Subject: [PATCH 05/14] Fix misc typing bugs. --- streamrip/bases.py | 10 +++++++++- streamrip/metadata.py | 2 +- streamrip/tracklists.py | 13 ++++++++----- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index 89f3c8a..fd402d0 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -656,6 +656,14 @@ class Video: return False # so that it is not tagged + def tag(self, *args, **kwargs): + """Dummy method. + + :param args: + :param kwargs: + """ + return False + @classmethod def from_album_meta(cls, track: dict, client: Client): """Given an video response dict from an album, return a new @@ -787,7 +795,7 @@ class Tracklist(list): if self._download_item(item, **kwargs): item.convert(**kwargs["conversion"]) - def _download_item(item, **kwargs): + def _download_item(item, *args: Any, **kwargs: Any) -> bool: """Abstract method. :param item: diff --git a/streamrip/metadata.py b/streamrip/metadata.py index 0a4e2f5..7fb7d16 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -62,7 +62,7 @@ class TrackMetadata: self.title: str self.album: str self.albumartist: str - self.composer: str + self.composer: Optional[str] = None self.comment: Optional[str] = None self.description: Optional[str] = None self.purchase_date: Optional[str] = None diff --git a/streamrip/tracklists.py b/streamrip/tracklists.py index 9a95095..11444fd 100644 --- a/streamrip/tracklists.py +++ b/streamrip/tracklists.py @@ -9,7 +9,7 @@ import logging import os import re from tempfile import gettempdir -from typing import Dict, Generator, Iterable, Union +from typing import Dict, Generator, Iterable, Union, Optional import click from pathvalidate import sanitize_filename @@ -54,8 +54,11 @@ class Album(Tracklist): self.sampling_rate = None self.bit_depth = None - self.container = None + self.container: Optional[str] = None + self.disctotal: int + self.tracktotal: int + self.albumartist: str # usually an unpacked TrackMetadata.asdict() self.__dict__.update(kwargs) @@ -148,7 +151,7 @@ class Album(Tracklist): for item in self.booklets: Booklet(item).download(parent_folder=self.folder) - def _download_item( + def _download_item( # type: ignore self, track: Union[Track, Video], quality: int = 3, @@ -411,7 +414,7 @@ class Playlist(Tracklist): self.__download_index = 1 # used for tracknumbers self.download_message() - def _download_item(self, item: Track, **kwargs): + def _download_item(self, item: Track, **kwargs) -> bool: # type: ignore kwargs["parent_folder"] = self.folder if self.client.source == "soundcloud": item.load_meta() @@ -578,7 +581,7 @@ class Artist(Tracklist): self.download_message() return final - def _download_item( + def _download_item( # type: ignore self, item, parent_folder: str = "StreamripDownloads", From fc8f4cee83a04b396e2f172f23d39e345caf36f0 Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 1 May 2021 19:00:36 -0700 Subject: [PATCH 06/14] Run isort, fix typing --- streamrip/bases.py | 7 +++++-- streamrip/clients.py | 1 - streamrip/metadata.py | 7 ++++++- streamrip/tracklists.py | 3 ++- streamrip/utils.py | 19 +++++++++++++++---- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index fd402d0..eadbd0a 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -202,6 +202,7 @@ class Track: return False if self.client.source == "qobuz": + assert isinstance(dl_info, dict) # for typing if not self.__validate_qobuz_dl_info(dl_info): click.secho("Track is not available for download", fg="red") return False @@ -211,6 +212,7 @@ class Track: # --------- Download Track ---------- if self.client.source in ("qobuz", "tidal", "deezer"): + assert isinstance(dl_info, dict) logger.debug("Downloadable URL found: %s", dl_info.get("url")) try: tqdm_download( @@ -223,6 +225,7 @@ class Track: return False elif self.client.source == "soundcloud": + assert isinstance(dl_info, dict) self._soundcloud_download(dl_info) else: @@ -403,7 +406,7 @@ class Track: cover_url=cover_url, ) - def tag( + def tag( # noqa self, album_meta: dict = None, cover: Union[Picture, APIC, MP4Cover] = None, @@ -871,7 +874,7 @@ class Tracklist(list): info = cls._parse_get_resp(item, client=client) # equivalent to Album(client=client, **info) - return cls(client=client, **info) + return cls(client=client, **info) # type: ignore @staticmethod def get_cover_obj(cover_path: str, container: str, source: str): diff --git a/streamrip/clients.py b/streamrip/clients.py index 8b52730..51f80bf 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -1,5 +1,4 @@ import base64 -from pprint import pprint import hashlib import json import logging diff --git a/streamrip/metadata.py b/streamrip/metadata.py index 7fb7d16..f14a941 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -1,4 +1,6 @@ """Manages the information that will be embeded in the audio file. """ +from __future__ import annotations + import logging import re from collections import OrderedDict @@ -48,7 +50,10 @@ class TrackMetadata: """ def __init__( - self, track: Optional[dict] = None, album: Optional[dict] = None, source="qobuz" + self, + track: Optional[Union[TrackMetadata, dict]] = None, + album: Optional[Union[TrackMetadata, dict]] = None, + source="qobuz", ): """Creates a TrackMetadata object optionally initialized with dicts returned by the Qobuz API. diff --git a/streamrip/tracklists.py b/streamrip/tracklists.py index 11444fd..307efbd 100644 --- a/streamrip/tracklists.py +++ b/streamrip/tracklists.py @@ -9,7 +9,7 @@ import logging import os import re from tempfile import gettempdir -from typing import Dict, Generator, Iterable, Union, Optional +from typing import Dict, Generator, Iterable, Optional, Union import click from pathvalidate import sanitize_filename @@ -59,6 +59,7 @@ class Album(Tracklist): self.disctotal: int self.tracktotal: int self.albumartist: str + # usually an unpacked TrackMetadata.asdict() self.__dict__.update(kwargs) diff --git a/streamrip/utils.py b/streamrip/utils.py index ab8bb43..d643014 100644 --- a/streamrip/utils.py +++ b/streamrip/utils.py @@ -5,7 +5,7 @@ import os import re import sys from string import Formatter -from typing import Hashable, Optional, Union +from typing import Dict, Hashable, Optional, Union import click import requests @@ -52,6 +52,7 @@ def get_quality(quality_id: int, source: str) -> Union[str, int]: :type source: str :rtype: Union[str, int] """ + q_map: Dict[int, Union[int, str]] if source == "qobuz": q_map = { 1: 5, @@ -89,7 +90,8 @@ def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]): :param sampling_rate: :type sampling_rate: Optional[int] """ - if not (bit_depth or sampling_rate): # is lossy + # XXX: Should `0` quality be supported? + if bit_depth is None or sampling_rate is None: # is lossy return 1 if bit_depth == 16: @@ -266,6 +268,9 @@ def decho(message, fg=None): logger.debug(message) +interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'") + + def extract_interpreter_url(url: str) -> str: """Extract artist ID from a Qobuz interpreter url. @@ -275,8 +280,14 @@ def extract_interpreter_url(url: str) -> str: """ session = gen_threadsafe_session({"User-Agent": AGENT}) r = session.get(url) - artist_id = re.search(r"getSimilarArtist\(\s*'(\w+)'", r.text).group(1) - return artist_id + match = interpreter_artist_regex.search(r.text) + if match: + return match.group(1) + + raise Exception( + "Unable to extract artist id from interpreter url. Use a " + "url that contains an artist id." + ) def get_container(quality: int, source: str) -> str: From 75312da1c9f68e8962fe847121f7c8e907e4e777 Mon Sep 17 00:00:00 2001 From: nathom Date: Mon, 3 May 2021 12:23:11 -0700 Subject: [PATCH 07/14] =?UTF-8?q?Fix=20=E2=80=9CNot=20available=20for=20do?= =?UTF-8?q?wnload=20message=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamrip/bases.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index eadbd0a..b7d0f3a 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -220,7 +220,7 @@ class Track: ) # downloads file except NonStreamable: click.secho( - "Track {self!s} is not available for download, skipping.", fg="red" + f"Track {self!s} is not available for download, skipping.", fg="red" ) return False From afdd3b919f5838d6c7fff5286ba3f4a355edb853 Mon Sep 17 00:00:00 2001 From: nathom Date: Mon, 3 May 2021 22:34:26 -0700 Subject: [PATCH 08/14] Update README --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c77fd82..9900567 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,9 @@ # streamrip +[![Downloads](https://static.pepy.tech/personalized-badge/streamrip?period=total&units=international_system&left_color=black&right_color=green&left_text=Downloads)](https://pepy.tech/project/streamrip) + A scriptable stream downloader for Qobuz, Tidal, Deezer and SoundCloud. -## Attention - -The Deezloader server is currently down. This means Deezer downloads are currently not working. -Stay posted for updates. ## Features From cc5bc604ba4bd3cdb732b12ffd437fb9379f5f55 Mon Sep 17 00:00:00 2001 From: nathom Date: Tue, 4 May 2021 01:16:48 -0700 Subject: [PATCH 09/14] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9900567..ac19326 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ pip3 install streamrip windows-curses --upgrade -If you would like to use `streamrip`'s conversion capabilities, or download music from SoundCloud, install [ffmpeg](https://ffmpeg.org/download.html). +If you would like to use `streamrip`'s conversion capabilities, download TIDAL videos, or download music from SoundCloud, install [ffmpeg](https://ffmpeg.org/download.html). ## Example Usage From e8a72b62b1d00873e981ad274dfbaeef8b56d69b Mon Sep 17 00:00:00 2001 From: nathom Date: Tue, 4 May 2021 01:17:17 -0700 Subject: [PATCH 10/14] Add custom YAML reader --- .gitignore | 1 + streamrip/config.py | 93 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 43f2052..460a58a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ StreamripDownloads *.pyc *test.py /.mypy_cache +/streamrip/test.yaml diff --git a/streamrip/config.py b/streamrip/config.py index a454d3d..45a492e 100644 --- a/streamrip/config.py +++ b/streamrip/config.py @@ -1,11 +1,12 @@ """A config class that manages arguments between the config file and CLI.""" import copy +from collections import OrderedDict import logging import os import re from functools import cache from pprint import pformat -from typing import Any, Dict +from typing import Any, Dict, List from ruamel.yaml import YAML @@ -331,3 +332,93 @@ class ConfigDocumentation: with open(path, "w") as f: f.write("".join(lines)) + + +# ------------- ~~ Experimental ~~ ----------------- # + +def load_yaml(path: str): + """A custom YAML parser optimized for use with streamrip. + + Warning: this is not fully compliant with YAML. + + :param path: + :type path: str + """ + with open(path) as f: + lines = f.readlines() + + settings = OrderedDict() + type_dict = {t.__name__: t for t in (list, dict, str)} + for line in lines: + key_l: List[str] = [] + val_l: List[str] = [] + + chars = StringWalker(line) + level = 0 + + # get indent level of line + while next(chars).isspace(): + level += 1 + + chars.prev() + if (c := next(chars)) == '#': + # is a comment + continue + + elif c == '-': + # is an item in a list + next(chars) + val_l = list(chars) + level += 2 # it is a child of the previous key + item_type = 'list' + else: + # undo char read + chars.prev() + + if not val_l: + while (c := next(chars)) != ':': + key_l.append(c) + val_l = list(''.join(chars).strip()) + + if val_l: + val = ''.join(val_l) + else: + # start of a section + item_type = 'dict' + val = type_dict[item_type]() + + key = ''.join(key_l) + if level == 0: + settings[key] = val + elif level == 2: + parent = settings[tuple(settings.keys())[-1]] + if isinstance(parent, dict): + parent[key] = val + elif isinstance(parent, list): + parent.append(val) + else: + raise Exception(f"level too high: {level}") + + return settings + + +class StringWalker: + """A fancier str iterator.""" + + def __init__(self, s: str): + self.__val = s.replace('\n', '') + self.__pos = 0 + + def __next__(self) -> str: + try: + c = self.__val[self.__pos] + self.__pos += 1 + return c + except IndexError: + raise StopIteration + + def __iter__(self): + return self + + def prev(self, step: int = 1): + self.__pos -= step From 498e8b2f9c02e1b81dd858badb270d7a47b1cf9d Mon Sep 17 00:00:00 2001 From: nathom Date: Tue, 4 May 2021 01:52:01 -0700 Subject: [PATCH 11/14] Unescape html artifacts from last.fm playlist title --- streamrip/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streamrip/core.py b/streamrip/core.py index 924e6af..6782d9c 100644 --- a/streamrip/core.py +++ b/streamrip/core.py @@ -1,4 +1,5 @@ import concurrent.futures +import html import logging import os import re @@ -528,7 +529,7 @@ class MusicDL(list): r'

([^<]+)

', r.text ) if playlist_title_match is not None: - playlist_title = playlist_title_match.group(1) + playlist_title = html.unescape(playlist_title_match.group(1)) else: raise Exception("Error finding title from response") From 42ce6c6de35d6629a26d509bdd8d883d7bd15f3e Mon Sep 17 00:00:00 2001 From: nathom Date: Tue, 4 May 2021 12:57:00 -0700 Subject: [PATCH 12/14] Fix docstrings with pydocstring --- streamrip/bases.py | 141 ++++++++++++++++++++++++++------------ streamrip/cli.py | 11 +-- streamrip/clients.py | 89 +++++++++++++++++------- streamrip/config.py | 60 +++++++++++----- streamrip/constants.py | 2 + streamrip/converter.py | 33 +++++---- streamrip/core.py | 104 +++++++++++++++++++++++++--- streamrip/db.py | 12 ++-- streamrip/metadata.py | 110 ++++++++++++++++++------------ streamrip/spoofbuz.py | 22 ++++-- streamrip/tracklists.py | 147 ++++++++++++++++++++++++++++++---------- streamrip/utils.py | 81 ++++++++++++++-------- 12 files changed, 584 insertions(+), 228 deletions(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index b7d0f3a..f532985 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -1,4 +1,6 @@ -"""These are the lower level classes that are handled by Album, Playlist, +"""Bases that handle parsing and downloading media. + +These are the lower level classes that are handled by Album, Playlist, and the other objects. They can also be downloaded individually, for example, as a single track. """ @@ -103,7 +105,6 @@ class Track: def load_meta(self): """Send a request to the client to get metadata for this Track.""" - assert self.id is not None, "id must be set before loading metadata" self.resp = self.client.get(self.id, media_type="track") @@ -128,7 +129,8 @@ class Track: self.cover_url = None def _prepare_download(self, **kwargs): - """This function does preprocessing to prepare for downloading tracks. + """Do preprocessing before downloading items. + It creates the directories, downloads cover art, and (optionally) downloads booklets. @@ -269,7 +271,7 @@ class Track: ) def move(self, path: str): - """Moves the Track and sets self.path to the new path. + """Move the Track and set self.path to the new path. :param path: :type path: str @@ -279,8 +281,10 @@ class Track: self.path = path def _soundcloud_download(self, dl_info: dict): - """Downloads a soundcloud track. This requires a seperate function - because there are three methods that can be used to download a track: + """Download a soundcloud track. + + This requires a seperate function because there are three methods that + can be used to download a track: * original file downloads * direct mp3 downloads * hls stream ripping @@ -319,15 +323,14 @@ class Track: @property def _progress_desc(self) -> str: - """The description that is used on the progress bar. + """Get the description that is used on the progress bar. :rtype: str """ return click.style(f"Track {int(self.meta.tracknumber):02}", fg="blue") def download_cover(self): - """Downloads the cover art, if cover_url is given.""" - + """Download the cover art, if cover_url is given.""" if not hasattr(self, "cover_url"): return False @@ -362,8 +365,7 @@ class Track: @classmethod def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client): - """Return a new Track object initialized with info from the album dicts - returned by client.get calls. + """Return a new Track object initialized with info. :param album: album metadata returned by API :param pos: index of the track @@ -371,14 +373,12 @@ class Track: :type client: Client :raises IndexError """ - meta = TrackMetadata(album=album, track=track, source=client.source) return cls(client=client, meta=meta, id=track["id"]) @classmethod def from_api(cls, item: dict, client: Client): - """Given a track dict from an API, return a new Track object - initialized with the proper values. + """Return a new Track initialized from search result. :param item: :type item: dict @@ -501,7 +501,7 @@ class Track: self.tagged = True def convert(self, codec: str = "ALAC", **kwargs): - """Converts the track to another codec. + """Convert the track to another codec. Valid values for codec: * FLAC @@ -565,7 +565,7 @@ class Track: @property def title(self) -> str: - """The title of the track. + """Get the title of the track. :rtype: str """ @@ -586,8 +586,9 @@ class Track: return safe_get(self.meta, *keys, default=default) def set(self, key, val): - """Equivalent to __setitem__. Implemented only for - consistency. + """Set attribute `key` to `val`. + + Equivalent to __setitem__. Implemented only for consistency. :param key: :param val: @@ -617,8 +618,7 @@ class Track: return f"" def __str__(self) -> str: - """Return a readable string representation of - this track. + """Return a readable string representation of this track. :rtype: str """ @@ -629,6 +629,14 @@ class Video: """Only for Tidal.""" def __init__(self, client: Client, id: str, **kwargs): + """Initialize a Video object. + + :param client: + :type client: Client + :param id: The TIDAL Video ID + :type id: str + :param kwargs: title, explicit, and tracknumber + """ self.id = id self.client = client self.title = kwargs.get("title", "MusicVideo") @@ -660,7 +668,9 @@ class Video: return False # so that it is not tagged def tag(self, *args, **kwargs): - """Dummy method. + """Return False. + + This is a dummy method. :param args: :param kwargs: @@ -669,10 +679,9 @@ class Video: @classmethod def from_album_meta(cls, track: dict, client: Client): - """Given an video response dict from an album, return a new - Video object from the information. + """Return a new Video object given an album API response. - :param track: + :param track: track dict from album :type track: dict :param client: :type client: Client @@ -687,7 +696,7 @@ class Video: @property def path(self) -> str: - """The path to download the mp4 file. + """Get path to download the mp4 file. :rtype: str """ @@ -701,9 +710,17 @@ class Video: return os.path.join(self.parent_folder, f"{fname}.mp4") def __str__(self) -> str: + """Return the title. + + :rtype: str + """ return self.title def __repr__(self) -> str: + """Return a string representation of self. + + :rtype: str + """ return f"