diff --git a/rip/cli.py b/rip/cli.py index a560bc9..06e8b5d 100644 --- a/rip/cli.py +++ b/rip/cli.py @@ -2,6 +2,7 @@ import concurrent.futures import logging import os import threading +from typing import Optional import requests from cleo.application import Application as BaseApplication @@ -85,7 +86,9 @@ class DownloadCommand(Command): if len(core) > 0: core.download() elif not urls and path is None: - self.line("Must pass arguments. See rip url -h.") + self.line( + "Must pass arguments. See rip url -h." + ) update_check.join() if outdated: @@ -112,10 +115,16 @@ class DownloadCommand(Command): "https://api.github.com/repos/nathom/streamrip/releases/latest" ).json()["body"] - release_notes = md_header.sub(r"
\1
", release_notes) - release_notes = bullet_point.sub(r"• \1", release_notes) + release_notes = md_header.sub( + r"
\1
", release_notes + ) + release_notes = bullet_point.sub( + r"• \1", release_notes + ) release_notes = code.sub(r"\1", release_notes) - release_notes = issue_reference.sub(r"\1", release_notes) + release_notes = issue_reference.sub( + r"\1", release_notes + ) self.line(release_notes) @@ -145,7 +154,9 @@ class SearchCommand(Command): def handle(self): query = self.argument("query") - source, type = clean_options(self.option("source"), self.option("type")) + source, type = clean_options( + self.option("source"), self.option("type") + ) config = Config() core = RipCore(config) @@ -197,7 +208,9 @@ class DiscoverCommand(Command): max_items = self.option("max-items") if chosen_list not in QOBUZ_FEATURED_KEYS: - self.line(f'Error: list "{chosen_list}" not available') + self.line( + f'Error: list "{chosen_list}" not available' + ) self.line(self.help) return 1 @@ -259,29 +272,34 @@ class ConfigCommand(Command): {--qobuz : Set the credentials for Qobuz} {--tidal : Log into Tidal} {--deezer : Set the Deezer ARL} + {--music-app : Configure the config file for usage with the macOS Music App} {--reset : Reset the config file} {--update : Reset the config file, keeping the credentials} """ + _config: Optional[Config] + def handle(self): import shutil from .constants import CONFIG_DIR, CONFIG_PATH - config = Config() + self._config = Config() if self.option("path"): self.line(f"{CONFIG_PATH}") if self.option("open"): - self.line(f"Opening {CONFIG_PATH} in default application") + self.line( + f"Opening {CONFIG_PATH} in default application" + ) launch(CONFIG_PATH) if self.option("reset"): - config.reset() + self._config.reset() if self.option("update"): - config.update() + self._config.update() if self.option("open-vim"): if shutil.which("nvim") is not None: @@ -298,8 +316,8 @@ class ConfigCommand(Command): client = TidalClient() client.login() - config.file["tidal"].update(client.get_tokens()) - config.save() + self._config.file["tidal"].update(client.get_tokens()) + self._config.save() self.line("Credentials saved to config.") if self.option("deezer"): @@ -316,22 +334,71 @@ class ConfigCommand(Command): try: DeezerClient().login(arl=given_arl) - config.file["deezer"]["arl"] = given_arl - config.save() + self._config.file["deezer"]["arl"] = given_arl + self._config.save() self.line("Sucessfully logged in!") except AuthenticationError: - self.line("Could not log in. Double check your ARL") + self.line( + "Could not log in. Double check your ARL" + ) if self.option("qobuz"): import getpass import hashlib - config.file["qobuz"]["email"] = self.ask("Qobuz email:") - config.file["qobuz"]["password"] = hashlib.md5( - getpass.getpass("Qobuz password (won't show on screen): ").encode() + self._config.file["qobuz"]["email"] = self.ask("Qobuz email:") + self._config.file["qobuz"]["password"] = hashlib.md5( + getpass.getpass( + "Qobuz password (won't show on screen): " + ).encode() ).hexdigest() - config.save() + self._config.save() + + if self.option("music-app"): + self._conf_music_app() + + def _conf_music_app(self): + import subprocess + import xml.etree.ElementTree as ET + from pathlib import Path + from tempfile import mktemp + + # Find the Music library folder + temp_file = mktemp() + music_pref_plist = Path(Path.home()) / Path( + "Library/Preferences/com.apple.Music.plist" + ) + # copy preferences to tempdir + subprocess.run(["cp", music_pref_plist, temp_file]) + # convert binary to xml for parsing + subprocess.run(["plutil", "-convert", "xml1", temp_file]) + items = iter(ET.parse(temp_file).getroot()[0]) + + for item in items: + if item.text == "NSNavLastRootDirectory": + break + + library_folder = Path(next(items).text) + os.remove(temp_file) + + # cp ~/library/preferences/com.apple.music.plist music.plist + # plutil -convert xml1 music.plist + # cat music.plist | pbcopy + + self._config.file["downloads"]["folder"] = os.path.join( + library_folder, "Automatically Add to Music.localized" + ) + + conversion_config = self._config.file["conversion"] + conversion_config["enabled"] = True + conversion_config["codec"] = "ALAC" + conversion_config["sampling_rate"] = 48000 + conversion_config["bit_depth"] = 24 + + self._config.file["filepaths"]["folder_format"] = "" + self._config.file["artwork"]["keep_hires_cover"] = False + self._config.save() class ConvertCommand(Command): @@ -409,7 +476,8 @@ class ConvertCommand(Command): futures.append( executor.submit( ConverterCls( - filename=os.path.join(dirname, file), **converter_args + filename=os.path.join(dirname, file), + **converter_args, ).convert ) ) @@ -428,7 +496,8 @@ class ConvertCommand(Command): ConverterCls(filename=path, **converter_args).convert() else: self.line( - f'Path "{path}" does not exist.', fg="red" + f'Path "{path}" does not exist.', + fg="red", ) @@ -534,7 +603,9 @@ class Application(BaseApplication): formatter.set_style("path", Style("green", options=["bold"])) formatter.set_style("cmd", Style("magenta")) formatter.set_style("title", Style("yellow", options=["bold"])) - formatter.set_style("header", Style("yellow", options=["bold", "underline"])) + formatter.set_style( + "header", Style("yellow", options=["bold", "underline"]) + ) io.output.set_formatter(formatter) io.error_output.set_formatter(formatter) diff --git a/rip/config.toml b/rip/config.toml index 26ee7cf..98e138f 100644 --- a/rip/config.toml +++ b/rip/config.toml @@ -130,6 +130,9 @@ keep_hires_cover = true set_playlist_to_album = true # Replaces the original track's tracknumber with it's position in the playlist new_playlist_tracknumbers = true +# The following metadata tags won't be applied +# See https://github.com/nathom/streamrip/wiki/Metadata-Tag-Names for more info +exclude = [] # Changes the folder and file names generated by streamrip. [filepaths] @@ -157,4 +160,4 @@ progress_bar = "dainty" [misc] # Metadata to identify this config file. Do not change. -version = "1.3" +version = "1.4" diff --git a/rip/core.py b/rip/core.py index b9ca3cf..9bc59d9 100644 --- a/rip/core.py +++ b/rip/core.py @@ -111,14 +111,18 @@ class RipCore(list): else: self.config = config - if (theme := self.config.file["theme"]["progress_bar"]) != TQDM_DEFAULT_THEME: + if ( + theme := self.config.file["theme"]["progress_bar"] + ) != TQDM_DEFAULT_THEME: set_progress_bar_theme(theme.lower()) def get_db(db_type: str) -> db.Database: db_settings = self.config.session["database"] db_class = db.CLASS_MAP[db_type] - if db_settings[db_type]["enabled"] and db_settings.get("enabled", True): + if db_settings[db_type]["enabled"] and db_settings.get( + "enabled", True + ): default_db_path = DB_PATH_MAP[db_type] path = db_settings[db_type]["path"] @@ -212,8 +216,9 @@ class RipCore(list): session = self.config.session logger.debug(session) # So that the dictionary isn't searched for the same keys multiple times - artwork, conversion, filepaths = tuple( - session[key] for key in ("artwork", "conversion", "filepaths") + artwork, conversion, filepaths, metadata = ( + session[key] + for key in ("artwork", "conversion", "filepaths", "metadata") ) concurrency = session["downloads"]["concurrency"] return { @@ -223,12 +228,12 @@ class RipCore(list): "embed_cover": artwork["embed"], "embed_cover_size": artwork["size"], "keep_hires_cover": artwork["keep_hires_cover"], - "set_playlist_to_album": session["metadata"]["set_playlist_to_album"], + "set_playlist_to_album": metadata["set_playlist_to_album"], "stay_temp": conversion["enabled"], "conversion": conversion, "concurrent_downloads": concurrency["enabled"], "max_connections": concurrency["max_connections"], - "new_tracknumbers": session["metadata"]["new_playlist_tracknumbers"], + "new_tracknumbers": metadata["new_playlist_tracknumbers"], "download_videos": session["tidal"]["download_videos"], "download_booklets": session["qobuz"]["download_booklets"], "download_youtube_videos": session["youtube"]["download_videos"], @@ -238,6 +243,7 @@ class RipCore(list): "add_singles_to_folder": filepaths["add_singles_to_folder"], "max_artwork_width": int(artwork["max_width"]), "max_artwork_height": int(artwork["max_height"]), + "exclude_tags": metadata["exclude"], } def repair(self, max_items=None): @@ -257,7 +263,9 @@ class RipCore(list): ) exit() - for counter, (source, media_type, item_id) in enumerate(self.failed_db): + for counter, (source, media_type, item_id) in enumerate( + self.failed_db + ): if counter >= max_items: break @@ -280,7 +288,9 @@ class RipCore(list): logger.debug("Arguments from config: %s", arguments) - source_subdirs = self.config.session["downloads"]["source_subdirectories"] + source_subdirs = self.config.session["downloads"][ + "source_subdirectories" + ] for item in self: # Item already checked in database in handle_urls if source_subdirs: @@ -292,20 +302,26 @@ class RipCore(list): item.download(**arguments) continue - arguments["quality"] = self.config.session[item.client.source]["quality"] + arguments["quality"] = self.config.session[item.client.source][ + "quality" + ] if isinstance(item, Artist): filters_ = tuple( k for k, v in self.config.session["filters"].items() if v ) arguments["filters"] = filters_ - logger.debug("Added filter argument for artist/label: %s", filters_) + logger.debug( + "Added filter argument for artist/label: %s", filters_ + ) if not isinstance(item, Tracklist) or not item.loaded: logger.debug("Loading metadata") try: item.load_meta(**arguments) except NonStreamable: - self.failed_db.add((item.client.source, item.type, item.id)) + self.failed_db.add( + (item.client.source, item.type, item.id) + ) secho(f"{item!s} is not available, skipping.", fg="red") continue @@ -332,7 +348,7 @@ class RipCore(list): self.db.add(str(item_id)) if isinstance(item, Track): - item.tag() + item.tag(exclude_tags=arguments["exclude_tags"]) if arguments["conversion"]["enabled"]: item.convert(**arguments["conversion"]) @@ -342,7 +358,9 @@ class RipCore(list): :param featured_list: The name of the list. See `rip discover --help`. :type featured_list: str """ - self.extend(self.search("qobuz", featured_list, "featured", limit=max_items)) + self.extend( + self.search("qobuz", featured_list, "featured", limit=max_items) + ) def get_client(self, source: str) -> Client: """Get a client given the source and log in. @@ -448,12 +466,15 @@ class RipCore(list): fg="yellow", ) parsed.extend( - ("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls + ("deezer", *extract_deezer_dynamic_link(url)) + for url in dynamic_urls ) parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url) - soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls] + soundcloud_items = [ + self.clients["soundcloud"].get(u) for u in soundcloud_urls + ] parsed.extend( ("soundcloud", item["kind"], url) @@ -484,11 +505,15 @@ class RipCore(list): # For testing: # https://www.last.fm/user/nathan3895/playlists/12058911 - user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+") + user_regex = re.compile( + r"https://www\.last\.fm/user/([^/]+)/playlists/\d+" + ) lastfm_urls = LASTFM_URL_REGEX.findall(urls) try: lastfm_source = self.config.session["lastfm"]["source"] - lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"] + lastfm_fallback_source = self.config.session["lastfm"][ + "fallback_source" + ] except KeyError: self._config_updating_message() self.config.update() @@ -522,12 +547,16 @@ class RipCore(list): ) query_is_clean = banned_words_plain.search(query) is None - search_results = self.search(source, query, media_type="track") + search_results = self.search( + source, query, media_type="track" + ) track = next(search_results) if query_is_clean: while banned_words.search(track["title"]) is not None: - logger.debug("Track title banned for query=%s", query) + logger.debug( + "Track title banned for query=%s", query + ) track = next(search_results) # Because the track is searched as a single we need to set @@ -537,7 +566,9 @@ class RipCore(list): except (NoResultsFound, StopIteration): return None - track = try_search(lastfm_source) or try_search(lastfm_fallback_source) + track = try_search(lastfm_source) or try_search( + lastfm_fallback_source + ) if track is None: return False @@ -561,7 +592,9 @@ class RipCore(list): pl.creator = creator_match.group(1) tracks_not_found = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=15 + ) as executor: futures = [ executor.submit(search_query, title, artist, pl) for title, artist in queries @@ -639,7 +672,9 @@ class RipCore(list): else: logger.debug("Not generator") items = ( - results.get("data") or results.get("items") or results.get("collection") + results.get("data") + or results.get("items") + or results.get("collection") ) if items is None: raise NoResultsFound(query) @@ -679,7 +714,9 @@ class RipCore(list): raise NotImplementedError fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname) - ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields}) + ret = fmt.format( + **{k: media.get(k, default="Unknown") for k in fields} + ) return ret def interactive_search( @@ -817,7 +854,9 @@ class RipCore(list): playlist_title = html.unescape(playlist_title_match.group(1)) if remaining_tracks > 0: - with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=15 + ) as executor: last_page = int(remaining_tracks // 50) + int( remaining_tracks % 50 != 0 ) @@ -872,7 +911,9 @@ class RipCore(list): fg="blue", ) - self.config.file["deezer"]["arl"] = input(style("ARL: ", fg="green")) + self.config.file["deezer"]["arl"] = input( + style("ARL: ", fg="green") + ) self.config.save() secho( f'Credentials saved to config file at "{self.config._path}"', diff --git a/rip/db.py b/rip/db.py index 6125241..fbbe35c 100644 --- a/rip/db.py +++ b/rip/db.py @@ -71,11 +71,15 @@ class Database: with sqlite3.connect(self.path) as conn: conditions = " AND ".join(f"{key}=?" for key in items.keys()) - command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})" + command = ( + f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})" + ) logger.debug("Executing %s", command) - return bool(conn.execute(command, tuple(items.values())).fetchone()[0]) + return bool( + conn.execute(command, tuple(items.values())).fetchone()[0] + ) def __contains__(self, keys: Union[str, dict]) -> bool: """Check whether a key-value pair exists in the database. @@ -119,7 +123,9 @@ class Database: params = ", ".join(self.structure.keys()) question_marks = ", ".join("?" for _ in items) - command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})" + command = ( + f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})" + ) logger.debug("Executing %s", command) logger.debug("Items to add: %s", items) diff --git a/streamrip/clients.py b/streamrip/clients.py index 4abc308..ab9869e 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -132,7 +132,10 @@ class QobuzClient(Client): if not kwargs.get("app_id") or not kwargs.get("secrets"): self._get_app_id_and_secrets() # can be async else: - self.app_id, self.secrets = str(kwargs["app_id"]), kwargs["secrets"] + self.app_id, self.secrets = ( + str(kwargs["app_id"]), + kwargs["secrets"], + ) self.session = gen_threadsafe_session( headers={"User-Agent": AGENT, "X-App-Id": self.app_id} ) @@ -215,7 +218,10 @@ class QobuzClient(Client): def _get_app_id_and_secrets(self): if not hasattr(self, "app_id") or not hasattr(self, "secrets"): spoofer = Spoofer() - self.app_id, self.secrets = str(spoofer.get_app_id()), spoofer.get_secrets() + self.app_id, self.secrets = ( + str(spoofer.get_app_id()), + spoofer.get_secrets(), + ) if not hasattr(self, "sec"): if not hasattr(self, "session"): @@ -234,7 +240,9 @@ class QobuzClient(Client): :rtype: dict """ page, status_code = self._api_request(epoint, params) - logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys())) + logger.debug( + "Keys returned from _gen_pages: %s", ", ".join(page.keys()) + ) key = epoint.split("/")[0] + "s" total = page.get(key, {}) total = total.get("total") or total.get("items") @@ -257,7 +265,8 @@ class QobuzClient(Client): """Check if the secrets are usable.""" with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ - executor.submit(self._test_secret, secret) for secret in self.secrets + executor.submit(self._test_secret, secret) + for secret in self.secrets ] for future in concurrent.futures.as_completed(futures): @@ -300,11 +309,15 @@ class QobuzClient(Client): response, status_code = self._api_request(epoint, params) if status_code != 200: - raise Exception(f'Error fetching metadata. "{response["message"]}"') + raise Exception( + f'Error fetching metadata. "{response["message"]}"' + ) return response - def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator: + def _api_search( + self, query: str, media_type: str, limit: int = 500 + ) -> Generator: """Send a search request to the API. :param query: @@ -356,7 +369,9 @@ class QobuzClient(Client): resp, status_code = self._api_request(epoint, params) if status_code == 401: - raise AuthenticationError(f"Invalid credentials from params {params}") + raise AuthenticationError( + f"Invalid credentials from params {params}" + ) elif status_code == 400: logger.debug(resp) raise InvalidAppIdError(f"Invalid app id from params {params}") @@ -364,7 +379,9 @@ class QobuzClient(Client): logger.info("Logged in to Qobuz") if not resp["user"]["credential"]["parameters"]: - raise IneligibleError("Free accounts are not eligible to download tracks.") + raise IneligibleError( + "Free accounts are not eligible to download tracks." + ) self.uat = resp["user_auth_token"] self.session.headers.update({"X-User-Auth-Token": self.uat}) @@ -413,7 +430,9 @@ class QobuzClient(Client): } response, status_code = self._api_request("track/getFileUrl", params) if status_code == 400: - raise InvalidAppSecretError("Invalid app secret from params %s" % params) + raise InvalidAppSecretError( + "Invalid app secret from params %s" % params + ) return response @@ -432,7 +451,9 @@ class QobuzClient(Client): logger.debug(r.text) return r.json(), r.status_code except Exception: - logger.error("Problem getting JSON. Status code: %s", r.status_code) + logger.error( + "Problem getting JSON. Status code: %s", r.status_code + ) raise def _test_secret(self, secret: str) -> Optional[str]: @@ -464,7 +485,9 @@ class DeezerClient(Client): # no login required self.logged_in = False - def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict: + def search( + self, query: str, media_type: str = "album", limit: int = 200 + ) -> dict: """Search API for query. :param query: @@ -550,9 +573,9 @@ class DeezerClient(Client): format_no, format_str = format_info dl_info["size_to_quality"] = { - int(track_info.get(f"FILESIZE_{format}")): self._quality_id_from_filetype( - format - ) + int( + track_info.get(f"FILESIZE_{format}") + ): self._quality_id_from_filetype(format) for format in DEEZER_FORMATS } @@ -593,7 +616,9 @@ class DeezerClient(Client): logger.debug("Info bytes: %s", info_bytes) path = self._gen_url_path(info_bytes) logger.debug(path) - return f"https://e-cdns-proxy-{track_hash[0]}.dzcdn.net/mobile/1/{path}" + return ( + f"https://e-cdns-proxy-{track_hash[0]}.dzcdn.net/mobile/1/{path}" + ) def _gen_url_path(self, data): return binascii.hexlify( @@ -623,7 +648,9 @@ class DeezloaderClient(Client): # no login required self.logged_in = True - def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict: + def search( + self, query: str, media_type: str = "album", limit: int = 200 + ) -> dict: """Search API for query. :param query: @@ -660,7 +687,9 @@ class DeezloaderClient(Client): url = f"{DEEZER_BASE}/{media_type}/{meta_id}" item = self.session.get(url).json() if media_type in ("album", "playlist"): - tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json() + tracks = self.session.get( + f"{url}/tracks", params={"limit": 1000} + ).json() item["tracks"] = tracks["data"] item["track_total"] = len(tracks["data"]) elif media_type == "artist": @@ -756,7 +785,9 @@ class TidalClient(Client): logger.debug(resp) return resp - def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict: + def search( + self, query: str, media_type: str = "album", limit: int = 100 + ) -> dict: """Search for a query. :param query: @@ -785,13 +816,19 @@ class TidalClient(Client): return self._get_video_stream_url(track_id) params = { - "audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source), + "audioquality": get_quality( + min(quality, TIDAL_MAX_Q), self.source + ), "playbackmode": "STREAM", "assetpresentation": "FULL", } - resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params) + resp = self._api_request( + f"tracks/{track_id}/playbackinfopostpaywall", params + ) try: - manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) + manifest = json.loads( + base64.b64decode(resp["manifest"]).decode("utf-8") + ) except KeyError: raise Exception(resp["userMessage"]) @@ -996,7 +1033,9 @@ class TidalClient(Client): offset += 100 tracks_left -= 100 resp["items"].extend( - self._api_request(f"{url}/items", {"offset": offset})["items"] + self._api_request(f"{url}/items", {"offset": offset})[ + "items" + ] ) item["tracks"] = [item["item"] for item in resp["items"]] @@ -1048,7 +1087,9 @@ class TidalClient(Client): r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"' r",RESOLUTION=\d+x\d+\n(.+)" ) - manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) + manifest = json.loads( + base64.b64decode(resp["manifest"]).decode("utf-8") + ) available_urls = self.session.get(manifest["urls"][0]) url_info = re.findall(stream_url_regex, available_urls.text) @@ -1138,7 +1179,10 @@ class SoundCloudClient(Client): url = None for tc in track["media"]["transcodings"]: fmt = tc["format"] - if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": + if ( + fmt["protocol"] == "hls" + and fmt["mime_type"] == "audio/mpeg" + ): url = tc["url"] break diff --git a/streamrip/constants.py b/streamrip/constants.py index 6b3bd3a..fb10724 100644 --- a/streamrip/constants.py +++ b/streamrip/constants.py @@ -4,7 +4,9 @@ import mutagen.id3 as id3 AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0" -TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" +TIDAL_COVER_URL = ( + "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" +) SOUNDCLOUD_CLIENT_ID = "QFciLWLC1GS4P3EZvXIjA3jKhKO5pKB3" SOUNDCLOUD_USER_ID = "672320-86895-162383-801513" SOUNDCLOUD_APP_VERSION = "1626941202" @@ -136,9 +138,7 @@ ALBUM_KEYS = ( "albumcomposer", ) # TODO: rename these to DEFAULT_FOLDER_FORMAT etc -FOLDER_FORMAT = ( - "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]" -) +FOLDER_FORMAT = "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]" TRACK_FORMAT = "{tracknumber}. {artist} - {title}" diff --git a/streamrip/converter.py b/streamrip/converter.py index f390471..e31edfe 100644 --- a/streamrip/converter.py +++ b/streamrip/converter.py @@ -52,7 +52,9 @@ class Converter: self.filename = filename self.final_fn = f"{os.path.splitext(filename)[0]}.{self.container}" - self.tempfile = os.path.join(gettempdir(), os.path.basename(self.final_fn)) + self.tempfile = os.path.join( + gettempdir(), os.path.basename(self.final_fn) + ) self.remove_source = remove_source self.sampling_rate = sampling_rate self.bit_depth = bit_depth @@ -117,9 +119,13 @@ class Converter: if self.lossless: if isinstance(self.sampling_rate, int): sampling_rates = "|".join( - str(rate) for rate in SAMPLING_RATES if rate <= self.sampling_rate + str(rate) + for rate in SAMPLING_RATES + if rate <= self.sampling_rate + ) + command.extend( + ["-af", f"aformat=sample_rates={sampling_rates}"] ) - command.extend(["-af", f"aformat=sample_rates={sampling_rates}"]) elif self.sampling_rate is not None: raise TypeError( @@ -134,7 +140,9 @@ class Converter: else: raise ValueError("Bit depth must be 16, 24, or 32") elif self.bit_depth is not None: - raise TypeError(f"Bit depth must be int, not {type(self.bit_depth)}") + raise TypeError( + f"Bit depth must be int, not {type(self.bit_depth)}" + ) # automatically overwrite command.extend(["-y", self.tempfile]) @@ -199,7 +207,9 @@ class Vorbis(Converter): codec_name = "vorbis" codec_lib = "libvorbis" container = "ogg" - default_ffmpeg_arg = "-q:a 6" # 160, aka the "high" quality profile from Spotify + default_ffmpeg_arg = ( + "-q:a 6" # 160, aka the "high" quality profile from Spotify + ) class OPUS(Converter): diff --git a/streamrip/media.py b/streamrip/media.py index 5f09b2e..fd09e03 100644 --- a/streamrip/media.py +++ b/streamrip/media.py @@ -13,7 +13,17 @@ import re import shutil import subprocess from tempfile import gettempdir -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Union +from typing import ( + Any, + Dict, + Generator, + Iterable, + List, + Optional, + Sequence, + Tuple, + Union, +) from click import echo, secho, style from mutagen.flac import FLAC, Picture @@ -56,7 +66,9 @@ logger = logging.getLogger("streamrip") TYPE_REGEXES = { "remaster": re.compile(r"(?i)(re)?master(ed)?"), - "extra": re.compile(r"(?i)(anniversary|deluxe|live|collector|demo|expanded)"), + "extra": re.compile( + r"(?i)(anniversary|deluxe|live|collector|demo|expanded)" + ), } @@ -198,12 +210,15 @@ class Track(Media): if source == "qobuz": self.cover_url = self.resp["album"]["image"]["large"] elif source == "tidal": - self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320) + self.cover_url = tidal_cover_url( + self.resp["album"]["cover"], 320 + ) elif source == "deezer": self.cover_url = self.resp["album"]["cover_medium"] elif source == "soundcloud": self.cover_url = ( - self.resp["artwork_url"] or self.resp["user"].get("avatar_url") + self.resp["artwork_url"] + or self.resp["user"].get("avatar_url") ).replace("large", "t500x500") else: raise InvalidSourceError(source) @@ -251,7 +266,9 @@ class Track(Media): except ItemExists as e: logger.debug(e) - self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp") + self.path = os.path.join( + gettempdir(), f"{hash(self.id)}_{self.quality}.tmp" + ) def download( # noqa self, @@ -306,9 +323,14 @@ class Track(Media): except KeyError as e: if restrictions := dl_info["restrictions"]: # Turn CamelCase code into a readable sentence - words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"]) + words = re.findall( + r"([A-Z][a-z]+)", restrictions[0]["code"] + ) raise NonStreamable( - words[0] + " " + " ".join(map(str.lower, words[1:])) + "." + words[0] + + " " + + " ".join(map(str.lower, words[1:])) + + "." ) secho(f"Panic: {e} dl_info = {dl_info}", fg="red") @@ -317,7 +339,9 @@ class Track(Media): _quick_download(download_url, self.path, desc=self._progress_desc) elif isinstance(self.client, DeezloaderClient): - _quick_download(dl_info["url"], self.path, desc=self._progress_desc) + _quick_download( + dl_info["url"], self.path, desc=self._progress_desc + ) elif self.client.source == "deezer": # We can only find out if the requested quality is available @@ -437,7 +461,9 @@ class Track(Media): ] ) elif dl_info["type"] == "original": - _quick_download(dl_info["url"], self.path, desc=self._progress_desc) + _quick_download( + dl_info["url"], self.path, desc=self._progress_desc + ) # if a wav is returned, convert to flac engine = converter.FLAC(self.path) @@ -465,7 +491,9 @@ class Track(Media): def download_cover(self, width=999999, height=999999): """Download the cover art, if cover_url is given.""" - self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg") + self.cover_path = os.path.join( + gettempdir(), f"cover{hash(self.cover_url)}.jpg" + ) logger.debug("Downloading cover from %s", self.cover_url) if not os.path.exists(self.cover_path): @@ -485,9 +513,9 @@ class Track(Media): formatter = self.meta.get_formatter(max_quality=self.quality) logger.debug("Track meta formatter %s", formatter) filename = clean_format(self.file_format, formatter) - self.final_path = os.path.join(self.folder, filename)[:250].strip() + ext( - self.quality, self.client.source - ) + self.final_path = os.path.join(self.folder, filename)[ + :250 + ].strip() + ext(self.quality, self.client.source) logger.debug("Formatted path: %s", self.final_path) @@ -500,7 +528,9 @@ class Track(Media): return self.final_path @classmethod - def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client): + def from_album_meta( + cls, album: TrackMetadata, track: dict, client: Client + ): """Return a new Track object initialized with info. :param album: album metadata returned by API @@ -510,7 +540,9 @@ class Track(Media): :raises: IndexError """ meta = TrackMetadata(album=album, track=track, source=client.source) - return cls(client=client, meta=meta, id=track["id"], part_of_tracklist=True) + return cls( + client=client, meta=meta, id=track["id"], part_of_tracklist=True + ) @classmethod def from_api(cls, item: dict, client: Client): @@ -554,6 +586,7 @@ class Track(Media): album_meta: dict = None, cover: Union[Picture, APIC, MP4Cover] = None, embed_cover: bool = True, + exclude_tags: Optional[Sequence] = None, **kwargs, ): """Tag the track using the stored metadata. @@ -569,7 +602,9 @@ class Track(Media): :param embed_cover: Embed cover art into file :type embed_cover: bool """ - assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata" + assert isinstance( + self.meta, TrackMetadata + ), "meta must be TrackMetadata" if not self.downloaded: logger.info( "Track %s not tagged because it was not downloaded", @@ -620,7 +655,10 @@ class Track(Media): raise InvalidQuality(f'Invalid quality: "{self.quality}"') # automatically generate key, value pairs based on container - tags = self.meta.tags(self.container) + tags = self.meta.tags( + self.container, + set(exclude_tags) if exclude_tags is not None else None, + ) for k, v in tags: logger.debug("Setting %s tag to %s", k, v) audio[k] = v @@ -690,7 +728,9 @@ class Track(Media): self.format_final_path() if not os.path.isfile(self.path): - logger.info("File %s does not exist. Skipping conversion.", self.path) + logger.info( + "File %s does not exist. Skipping conversion.", self.path + ) secho(f"{self!s} does not exist. Skipping conversion.", fg="red") return @@ -1093,7 +1133,8 @@ class Tracklist(list): kwargs.get("max_connections", 3) ) as executor: future_map = { - executor.submit(target, item, **kwargs): item for item in self + executor.submit(target, item, **kwargs): item + for item in self } try: concurrent.futures.wait(future_map.keys()) @@ -1124,7 +1165,9 @@ class Tracklist(list): secho(f"{item!s} exists. Skipping.", fg="yellow") except NonStreamable as e: e.print(item) - failed_downloads.append((item.client.source, item.type, item.id)) + failed_downloads.append( + (item.client.source, item.type, item.id) + ) self.downloaded = True @@ -1453,7 +1496,9 @@ class Album(Tracklist, Media): _cover_download(embed_cover_url, cover_path) hires_cov_path = os.path.join(self.folder, "cover.jpg") - if kwargs.get("keep_hires_cover", True) and not os.path.exists(hires_cov_path): + if kwargs.get("keep_hires_cover", True) and not os.path.exists( + hires_cov_path + ): logger.debug("Downloading hires cover") _cover_download(self.cover_urls["original"], hires_cov_path) @@ -1507,7 +1552,9 @@ class Album(Tracklist, Media): and isinstance(item, Track) and kwargs.get("folder_format") ): - disc_folder = os.path.join(self.folder, f"Disc {item.meta.discnumber}") + disc_folder = os.path.join( + self.folder, f"Disc {item.meta.discnumber}" + ) kwargs["parent_folder"] = disc_folder else: kwargs["parent_folder"] = self.folder @@ -1522,6 +1569,7 @@ class Album(Tracklist, Media): item.tag( cover=self.cover_obj, embed_cover=kwargs.get("embed_cover", True), + exclude_tags=kwargs.get("exclude_tags"), ) self.downloaded_ids.add(item.id) @@ -1601,7 +1649,9 @@ class Album(Tracklist, Media): :rtype: str """ - formatted_folder = clean_format(self.folder_format, self._get_formatter()) + formatted_folder = clean_format( + self.folder_format, self._get_formatter() + ) return os.path.join(parent_folder, formatted_folder) @@ -1719,7 +1769,9 @@ class Playlist(Tracklist, Media): if self.client.source == "qobuz": self.name = self.meta["name"] self.image = self.meta["images"] - self.creator = safe_get(self.meta, "owner", "name", default="Qobuz") + self.creator = safe_get( + self.meta, "owner", "name", default="Qobuz" + ) tracklist = self.meta["tracks"]["items"] @@ -1729,7 +1781,9 @@ class Playlist(Tracklist, Media): elif self.client.source == "tidal": self.name = self.meta["title"] self.image = tidal_cover_url(self.meta["image"], 640) - self.creator = safe_get(self.meta, "creator", "name", default="TIDAL") + self.creator = safe_get( + self.meta, "creator", "name", default="TIDAL" + ) tracklist = self.meta["tracks"] @@ -1742,7 +1796,9 @@ class Playlist(Tracklist, Media): elif self.client.source == "deezer": self.name = self.meta["title"] self.image = self.meta["picture_big"] - self.creator = safe_get(self.meta, "creator", "name", default="Deezer") + self.creator = safe_get( + self.meta, "creator", "name", default="Deezer" + ) tracklist = self.meta["tracks"] @@ -1783,7 +1839,9 @@ class Playlist(Tracklist, Media): logger.debug("Loaded %d tracks from playlist %s", len(self), self.name) - def _prepare_download(self, parent_folder: str = "StreamripDownloads", **kwargs): + def _prepare_download( + self, parent_folder: str = "StreamripDownloads", **kwargs + ): if kwargs.get("folder_format"): fname = sanitize_filename(self.name) self.folder = os.path.join(parent_folder, fname) @@ -1813,7 +1871,10 @@ class Playlist(Tracklist, Media): item.download(**kwargs) - item.tag(embed_cover=kwargs.get("embed_cover", True)) + item.tag( + embed_cover=kwargs.get("embed_cover", True), + exclude_tags=kwargs.get("exclude_tags"), + ) # if playlist_to_album and self.client.source == "deezer": # # Because Deezer tracks come pre-tagged, the `set_playlist_to_album` @@ -1995,7 +2056,9 @@ class Artist(Tracklist, Media): final = self if isinstance(filters, tuple) and self.client.source == "qobuz": - filter_funcs = (getattr(self, f"_{filter_}") for filter_ in filters) + filter_funcs = ( + getattr(self, f"_{filter_}") for filter_ in filters + ) for func in filter_funcs: final = filter(func, final) @@ -2108,7 +2171,10 @@ class Artist(Tracklist, Media): best_bd = bit_depth(a["bit_depth"] for a in group) best_sr = sampling_rate(a["sampling_rate"] for a in group) for album in group: - if album["bit_depth"] == best_bd and album["sampling_rate"] == best_sr: + if ( + album["bit_depth"] == best_bd + and album["sampling_rate"] == best_sr + ): yield album break diff --git a/streamrip/metadata.py b/streamrip/metadata.py index 6af025a..f82f61a 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -131,7 +131,9 @@ class TrackMetadata: self.album = resp.get("title", "Unknown Album") self.tracktotal = resp.get("tracks_count", 1) self.genre = resp.get("genres_list") or resp.get("genre") or [] - self.date = resp.get("release_date_original") or resp.get("release_date") + self.date = resp.get("release_date_original") or resp.get( + "release_date" + ) self.copyright = resp.get("copyright") self.albumartist = safe_get(resp, "artist", "name") self.albumcomposer = safe_get(resp, "composer", "name") @@ -140,7 +142,9 @@ class TrackMetadata: self.disctotal = ( max( track.get("media_number", 1) - for track in safe_get(resp, "tracks", "items", default=[{}]) + for track in safe_get( + resp, "tracks", "items", default=[{}] + ) ) or 1 ) @@ -179,14 +183,22 @@ class TrackMetadata: self.cover_urls = get_cover_urls(resp, self.__source) self.streamable = resp.get("allowStreaming", False) - if q := resp.get("audioQuality"): # for album entries in single tracks + if q := resp.get( + "audioQuality" + ): # for album entries in single tracks self._get_tidal_quality(q) elif self.__source == "deezer": self.album = resp.get("title", "Unknown Album") - self.tracktotal = resp.get("track_total", 0) or resp.get("nb_tracks", 0) + self.tracktotal = resp.get("track_total", 0) or resp.get( + "nb_tracks", 0 + ) self.disctotal = ( - max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1 + max( + track.get("disk_number") + for track in resp.get("tracks", [{}]) + ) + or 1 ) self.genre = safe_get(resp, "genres", "data") self.date = resp.get("release_date") @@ -343,7 +355,9 @@ class TrackMetadata: if isinstance(self._genres, list): if self.__source == "qobuz": - genres: Iterable = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) + genres: Iterable = re.findall( + r"([^\u2192\/]+)", "/".join(self._genres) + ) genres = set(genres) elif self.__source == "deezer": genres = (g["name"] for g in self._genres) @@ -377,7 +391,9 @@ class TrackMetadata: if hasattr(self, "_copyright"): if self._copyright is None: return None - copyright: str = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright) + copyright: str = re.sub( + r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright + ) copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, copyright) return copyright @@ -437,7 +453,9 @@ class TrackMetadata: formatter["sampling_rate"] /= 1000 return formatter - def tags(self, container: str = "flac") -> Generator: + def tags( + self, container: str = "flac", exclude: Optional[set] = None + ) -> Generator: """Create a generator of key, value pairs for use with mutagen. The *_KEY dicts are organized in the format: @@ -459,41 +477,52 @@ class TrackMetadata: :type container: str :rtype: Generator """ + if exclude is None: + exclude = set() + logger.debug("Excluded tags: %s", exclude) + container = container.lower() if container in ("flac", "vorbis"): - return self.__gen_flac_tags() + return self.__gen_flac_tags(exclude) if container in ("mp3", "id3"): - return self.__gen_mp3_tags() + return self.__gen_mp3_tags(exclude) if container in ("alac", "m4a", "mp4", "aac"): - return self.__gen_mp4_tags() + return self.__gen_mp4_tags(exclude) raise InvalidContainerError(f"Invalid container {container}") - def __gen_flac_tags(self) -> Generator: + def __gen_flac_tags(self, exclude: set) -> Generator: """Generate key, value pairs to tag FLAC files. :rtype: Tuple[str, str] """ for k, v in FLAC_KEY.items(): + logger.debug("attr: %s", k) + if k in exclude: + continue + tag = getattr(self, k) if tag: - if k in ( + if k in { "tracknumber", "discnumber", "tracktotal", "disctotal", - ): + }: tag = f"{int(tag):02}" logger.debug("Adding tag %s: %s", v, tag) yield (v, str(tag)) - def __gen_mp3_tags(self) -> Generator: + def __gen_mp3_tags(self, exclude: set) -> Generator: """Generate key, value pairs to tag MP3 files. :rtype: Tuple[str, str] """ for k, v in MP3_KEY.items(): + if k in exclude: + continue + if k == "tracknumber": text = f"{self.tracknumber}/{self.tracktotal}" elif k == "discnumber": @@ -504,12 +533,15 @@ class TrackMetadata: if text is not None and v is not None: yield (v.__name__, v(encoding=3, text=text)) - def __gen_mp4_tags(self) -> Generator: + def __gen_mp4_tags(self, exclude: set) -> Generator: """Generate key, value pairs to tag ALAC or AAC files. :rtype: Tuple[str, str] """ for k, v in MP4_KEY.items(): + if k in exclude: + continue + if k == "tracknumber": text = [(self.tracknumber, self.tracktotal)] elif k == "discnumber": @@ -581,7 +613,9 @@ class TrackMetadata: :rtype: int """ - return sum(hash(v) for v in self.asdict().values() if isinstance(v, Hashable)) + return sum( + hash(v) for v in self.asdict().values() if isinstance(v, Hashable) + ) def __repr__(self) -> str: """Return the string representation of the metadata object. diff --git a/streamrip/utils.py b/streamrip/utils.py index 8b446a7..0b71b23 100644 --- a/streamrip/utils.py +++ b/streamrip/utils.py @@ -82,7 +82,9 @@ class DownloadStream: info = self.request.json() try: # Usually happens with deezloader downloads - raise NonStreamable(f"{info['error']} -- {info['message']}") + raise NonStreamable( + f"{info['error']} -- {info['message']}" + ) except KeyError: raise NonStreamable(info) @@ -94,7 +96,10 @@ class DownloadStream: :rtype: Iterator """ - if self.source == "deezer" and self.is_encrypted.search(self.url) is not None: + if ( + self.source == "deezer" + and self.is_encrypted.search(self.url) is not None + ): assert isinstance(self.id, str), self.id blowfish_key = self._generate_blowfish_key(self.id) @@ -121,7 +126,9 @@ class DownloadStream: return self.file_size def _create_deezer_decryptor(self, key) -> Blowfish: - return Blowfish.new(key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07") + return Blowfish.new( + key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07" + ) @staticmethod def _generate_blowfish_key(track_id: str): @@ -204,7 +211,9 @@ __QUALITY_MAP: Dict[str, Dict[int, Union[int, str, Tuple[int, str]]]] = { } -def get_quality(quality_id: int, source: str) -> Union[str, int, Tuple[int, str]]: +def get_quality( + quality_id: int, source: str +) -> Union[str, int, Tuple[int, str]]: """Get the source-specific quality id. :param quality_id: the universal quality id (0, 1, 2, 4) @@ -291,7 +300,9 @@ def tidal_cover_url(uuid, size): possibles = (80, 160, 320, 640, 1280) assert size in possibles, f"size must be in {possibles}" - return TIDAL_COVER_URL.format(uuid=uuid.replace("-", "/"), height=size, width=size) + return TIDAL_COVER_URL.format( + uuid=uuid.replace("-", "/"), height=size, width=size + ) def init_log(path: Optional[str] = None, level: str = "DEBUG"): @@ -393,7 +404,9 @@ def gen_threadsafe_session( headers = {} session = requests.Session() - adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) + adapter = requests.adapters.HTTPAdapter( + pool_connections=100, pool_maxsize=100 + ) session.mount("https://", adapter) session.headers.update(headers) return session