Add metadata.exclude list to config

This commit is contained in:
Nathan Thomas 2021-08-17 10:36:41 -07:00
parent c9dfc61d9f
commit e06e017ec9
3 changed files with 209 additions and 71 deletions

View file

@ -111,14 +111,18 @@ class RipCore(list):
else: else:
self.config = config self.config = config
if (theme := self.config.file["theme"]["progress_bar"]) != TQDM_DEFAULT_THEME: if (
theme := self.config.file["theme"]["progress_bar"]
) != TQDM_DEFAULT_THEME:
set_progress_bar_theme(theme.lower()) set_progress_bar_theme(theme.lower())
def get_db(db_type: str) -> db.Database: def get_db(db_type: str) -> db.Database:
db_settings = self.config.session["database"] db_settings = self.config.session["database"]
db_class = db.CLASS_MAP[db_type] db_class = db.CLASS_MAP[db_type]
if db_settings[db_type]["enabled"] and db_settings.get("enabled", True): if db_settings[db_type]["enabled"] and db_settings.get(
"enabled", True
):
default_db_path = DB_PATH_MAP[db_type] default_db_path = DB_PATH_MAP[db_type]
path = db_settings[db_type]["path"] path = db_settings[db_type]["path"]
@ -212,8 +216,9 @@ class RipCore(list):
session = self.config.session session = self.config.session
logger.debug(session) logger.debug(session)
# So that the dictionary isn't searched for the same keys multiple times # So that the dictionary isn't searched for the same keys multiple times
artwork, conversion, filepaths = tuple( artwork, conversion, filepaths, metadata = (
session[key] for key in ("artwork", "conversion", "filepaths") session[key]
for key in ("artwork", "conversion", "filepaths", "metadata")
) )
concurrency = session["downloads"]["concurrency"] concurrency = session["downloads"]["concurrency"]
return { return {
@ -223,12 +228,12 @@ class RipCore(list):
"embed_cover": artwork["embed"], "embed_cover": artwork["embed"],
"embed_cover_size": artwork["size"], "embed_cover_size": artwork["size"],
"keep_hires_cover": artwork["keep_hires_cover"], "keep_hires_cover": artwork["keep_hires_cover"],
"set_playlist_to_album": session["metadata"]["set_playlist_to_album"], "set_playlist_to_album": metadata["set_playlist_to_album"],
"stay_temp": conversion["enabled"], "stay_temp": conversion["enabled"],
"conversion": conversion, "conversion": conversion,
"concurrent_downloads": concurrency["enabled"], "concurrent_downloads": concurrency["enabled"],
"max_connections": concurrency["max_connections"], "max_connections": concurrency["max_connections"],
"new_tracknumbers": session["metadata"]["new_playlist_tracknumbers"], "new_tracknumbers": metadata["new_playlist_tracknumbers"],
"download_videos": session["tidal"]["download_videos"], "download_videos": session["tidal"]["download_videos"],
"download_booklets": session["qobuz"]["download_booklets"], "download_booklets": session["qobuz"]["download_booklets"],
"download_youtube_videos": session["youtube"]["download_videos"], "download_youtube_videos": session["youtube"]["download_videos"],
@ -238,6 +243,7 @@ class RipCore(list):
"add_singles_to_folder": filepaths["add_singles_to_folder"], "add_singles_to_folder": filepaths["add_singles_to_folder"],
"max_artwork_width": int(artwork["max_width"]), "max_artwork_width": int(artwork["max_width"]),
"max_artwork_height": int(artwork["max_height"]), "max_artwork_height": int(artwork["max_height"]),
"exclude_tags": metadata["exclude"],
} }
def repair(self, max_items=None): def repair(self, max_items=None):
@ -257,7 +263,9 @@ class RipCore(list):
) )
exit() exit()
for counter, (source, media_type, item_id) in enumerate(self.failed_db): for counter, (source, media_type, item_id) in enumerate(
self.failed_db
):
if counter >= max_items: if counter >= max_items:
break break
@ -280,7 +288,9 @@ class RipCore(list):
logger.debug("Arguments from config: %s", arguments) logger.debug("Arguments from config: %s", arguments)
source_subdirs = self.config.session["downloads"]["source_subdirectories"] source_subdirs = self.config.session["downloads"][
"source_subdirectories"
]
for item in self: for item in self:
# Item already checked in database in handle_urls # Item already checked in database in handle_urls
if source_subdirs: if source_subdirs:
@ -292,20 +302,26 @@ class RipCore(list):
item.download(**arguments) item.download(**arguments)
continue continue
arguments["quality"] = self.config.session[item.client.source]["quality"] arguments["quality"] = self.config.session[item.client.source][
"quality"
]
if isinstance(item, Artist): if isinstance(item, Artist):
filters_ = tuple( filters_ = tuple(
k for k, v in self.config.session["filters"].items() if v k for k, v in self.config.session["filters"].items() if v
) )
arguments["filters"] = filters_ arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_) logger.debug(
"Added filter argument for artist/label: %s", filters_
)
if not isinstance(item, Tracklist) or not item.loaded: if not isinstance(item, Tracklist) or not item.loaded:
logger.debug("Loading metadata") logger.debug("Loading metadata")
try: try:
item.load_meta(**arguments) item.load_meta(**arguments)
except NonStreamable: except NonStreamable:
self.failed_db.add((item.client.source, item.type, item.id)) self.failed_db.add(
(item.client.source, item.type, item.id)
)
secho(f"{item!s} is not available, skipping.", fg="red") secho(f"{item!s} is not available, skipping.", fg="red")
continue continue
@ -332,7 +348,7 @@ class RipCore(list):
self.db.add(str(item_id)) self.db.add(str(item_id))
if isinstance(item, Track): if isinstance(item, Track):
item.tag() item.tag(exclude_tags=arguments["exclude_tags"])
if arguments["conversion"]["enabled"]: if arguments["conversion"]["enabled"]:
item.convert(**arguments["conversion"]) item.convert(**arguments["conversion"])
@ -342,7 +358,9 @@ class RipCore(list):
:param featured_list: The name of the list. See `rip discover --help`. :param featured_list: The name of the list. See `rip discover --help`.
:type featured_list: str :type featured_list: str
""" """
self.extend(self.search("qobuz", featured_list, "featured", limit=max_items)) self.extend(
self.search("qobuz", featured_list, "featured", limit=max_items)
)
def get_client(self, source: str) -> Client: def get_client(self, source: str) -> Client:
"""Get a client given the source and log in. """Get a client given the source and log in.
@ -448,12 +466,15 @@ class RipCore(list):
fg="yellow", fg="yellow",
) )
parsed.extend( parsed.extend(
("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls ("deezer", *extract_deezer_dynamic_link(url))
for url in dynamic_urls
) )
parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer
soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url) soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url)
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls] soundcloud_items = [
self.clients["soundcloud"].get(u) for u in soundcloud_urls
]
parsed.extend( parsed.extend(
("soundcloud", item["kind"], url) ("soundcloud", item["kind"], url)
@ -484,11 +505,15 @@ class RipCore(list):
# For testing: # For testing:
# https://www.last.fm/user/nathan3895/playlists/12058911 # https://www.last.fm/user/nathan3895/playlists/12058911
user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+") user_regex = re.compile(
r"https://www\.last\.fm/user/([^/]+)/playlists/\d+"
)
lastfm_urls = LASTFM_URL_REGEX.findall(urls) lastfm_urls = LASTFM_URL_REGEX.findall(urls)
try: try:
lastfm_source = self.config.session["lastfm"]["source"] lastfm_source = self.config.session["lastfm"]["source"]
lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"] lastfm_fallback_source = self.config.session["lastfm"][
"fallback_source"
]
except KeyError: except KeyError:
self._config_updating_message() self._config_updating_message()
self.config.update() self.config.update()
@ -522,12 +547,16 @@ class RipCore(list):
) )
query_is_clean = banned_words_plain.search(query) is None query_is_clean = banned_words_plain.search(query) is None
search_results = self.search(source, query, media_type="track") search_results = self.search(
source, query, media_type="track"
)
track = next(search_results) track = next(search_results)
if query_is_clean: if query_is_clean:
while banned_words.search(track["title"]) is not None: while banned_words.search(track["title"]) is not None:
logger.debug("Track title banned for query=%s", query) logger.debug(
"Track title banned for query=%s", query
)
track = next(search_results) track = next(search_results)
# Because the track is searched as a single we need to set # Because the track is searched as a single we need to set
@ -537,7 +566,9 @@ class RipCore(list):
except (NoResultsFound, StopIteration): except (NoResultsFound, StopIteration):
return None return None
track = try_search(lastfm_source) or try_search(lastfm_fallback_source) track = try_search(lastfm_source) or try_search(
lastfm_fallback_source
)
if track is None: if track is None:
return False return False
@ -561,7 +592,9 @@ class RipCore(list):
pl.creator = creator_match.group(1) pl.creator = creator_match.group(1)
tracks_not_found = 0 tracks_not_found = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: with concurrent.futures.ThreadPoolExecutor(
max_workers=15
) as executor:
futures = [ futures = [
executor.submit(search_query, title, artist, pl) executor.submit(search_query, title, artist, pl)
for title, artist in queries for title, artist in queries
@ -639,7 +672,9 @@ class RipCore(list):
else: else:
logger.debug("Not generator") logger.debug("Not generator")
items = ( items = (
results.get("data") or results.get("items") or results.get("collection") results.get("data")
or results.get("items")
or results.get("collection")
) )
if items is None: if items is None:
raise NoResultsFound(query) raise NoResultsFound(query)
@ -679,7 +714,9 @@ class RipCore(list):
raise NotImplementedError raise NotImplementedError
fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname) fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname)
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields}) ret = fmt.format(
**{k: media.get(k, default="Unknown") for k in fields}
)
return ret return ret
def interactive_search( def interactive_search(
@ -817,7 +854,9 @@ class RipCore(list):
playlist_title = html.unescape(playlist_title_match.group(1)) playlist_title = html.unescape(playlist_title_match.group(1))
if remaining_tracks > 0: if remaining_tracks > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: with concurrent.futures.ThreadPoolExecutor(
max_workers=15
) as executor:
last_page = int(remaining_tracks // 50) + int( last_page = int(remaining_tracks // 50) + int(
remaining_tracks % 50 != 0 remaining_tracks % 50 != 0
) )
@ -872,7 +911,9 @@ class RipCore(list):
fg="blue", fg="blue",
) )
self.config.file["deezer"]["arl"] = input(style("ARL: ", fg="green")) self.config.file["deezer"]["arl"] = input(
style("ARL: ", fg="green")
)
self.config.save() self.config.save()
secho( secho(
f'Credentials saved to config file at "{self.config._path}"', f'Credentials saved to config file at "{self.config._path}"',

View file

@ -13,7 +13,17 @@ import re
import shutil import shutil
import subprocess import subprocess
from tempfile import gettempdir from tempfile import gettempdir
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Union from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Sequence,
Tuple,
Union,
)
from click import echo, secho, style from click import echo, secho, style
from mutagen.flac import FLAC, Picture from mutagen.flac import FLAC, Picture
@ -56,7 +66,9 @@ logger = logging.getLogger("streamrip")
TYPE_REGEXES = { TYPE_REGEXES = {
"remaster": re.compile(r"(?i)(re)?master(ed)?"), "remaster": re.compile(r"(?i)(re)?master(ed)?"),
"extra": re.compile(r"(?i)(anniversary|deluxe|live|collector|demo|expanded)"), "extra": re.compile(
r"(?i)(anniversary|deluxe|live|collector|demo|expanded)"
),
} }
@ -198,12 +210,15 @@ class Track(Media):
if source == "qobuz": if source == "qobuz":
self.cover_url = self.resp["album"]["image"]["large"] self.cover_url = self.resp["album"]["image"]["large"]
elif source == "tidal": elif source == "tidal":
self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320) self.cover_url = tidal_cover_url(
self.resp["album"]["cover"], 320
)
elif source == "deezer": elif source == "deezer":
self.cover_url = self.resp["album"]["cover_medium"] self.cover_url = self.resp["album"]["cover_medium"]
elif source == "soundcloud": elif source == "soundcloud":
self.cover_url = ( self.cover_url = (
self.resp["artwork_url"] or self.resp["user"].get("avatar_url") self.resp["artwork_url"]
or self.resp["user"].get("avatar_url")
).replace("large", "t500x500") ).replace("large", "t500x500")
else: else:
raise InvalidSourceError(source) raise InvalidSourceError(source)
@ -251,7 +266,9 @@ class Track(Media):
except ItemExists as e: except ItemExists as e:
logger.debug(e) logger.debug(e)
self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp") self.path = os.path.join(
gettempdir(), f"{hash(self.id)}_{self.quality}.tmp"
)
def download( # noqa def download( # noqa
self, self,
@ -306,9 +323,14 @@ class Track(Media):
except KeyError as e: except KeyError as e:
if restrictions := dl_info["restrictions"]: if restrictions := dl_info["restrictions"]:
# Turn CamelCase code into a readable sentence # Turn CamelCase code into a readable sentence
words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"]) words = re.findall(
r"([A-Z][a-z]+)", restrictions[0]["code"]
)
raise NonStreamable( raise NonStreamable(
words[0] + " " + " ".join(map(str.lower, words[1:])) + "." words[0]
+ " "
+ " ".join(map(str.lower, words[1:]))
+ "."
) )
secho(f"Panic: {e} dl_info = {dl_info}", fg="red") secho(f"Panic: {e} dl_info = {dl_info}", fg="red")
@ -317,7 +339,9 @@ class Track(Media):
_quick_download(download_url, self.path, desc=self._progress_desc) _quick_download(download_url, self.path, desc=self._progress_desc)
elif isinstance(self.client, DeezloaderClient): elif isinstance(self.client, DeezloaderClient):
_quick_download(dl_info["url"], self.path, desc=self._progress_desc) _quick_download(
dl_info["url"], self.path, desc=self._progress_desc
)
elif self.client.source == "deezer": elif self.client.source == "deezer":
# We can only find out if the requested quality is available # We can only find out if the requested quality is available
@ -437,7 +461,9 @@ class Track(Media):
] ]
) )
elif dl_info["type"] == "original": elif dl_info["type"] == "original":
_quick_download(dl_info["url"], self.path, desc=self._progress_desc) _quick_download(
dl_info["url"], self.path, desc=self._progress_desc
)
# if a wav is returned, convert to flac # if a wav is returned, convert to flac
engine = converter.FLAC(self.path) engine = converter.FLAC(self.path)
@ -465,7 +491,9 @@ class Track(Media):
def download_cover(self, width=999999, height=999999): def download_cover(self, width=999999, height=999999):
"""Download the cover art, if cover_url is given.""" """Download the cover art, if cover_url is given."""
self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg") self.cover_path = os.path.join(
gettempdir(), f"cover{hash(self.cover_url)}.jpg"
)
logger.debug("Downloading cover from %s", self.cover_url) logger.debug("Downloading cover from %s", self.cover_url)
if not os.path.exists(self.cover_path): if not os.path.exists(self.cover_path):
@ -485,9 +513,9 @@ class Track(Media):
formatter = self.meta.get_formatter(max_quality=self.quality) formatter = self.meta.get_formatter(max_quality=self.quality)
logger.debug("Track meta formatter %s", formatter) logger.debug("Track meta formatter %s", formatter)
filename = clean_format(self.file_format, formatter) filename = clean_format(self.file_format, formatter)
self.final_path = os.path.join(self.folder, filename)[:250].strip() + ext( self.final_path = os.path.join(self.folder, filename)[
self.quality, self.client.source :250
) ].strip() + ext(self.quality, self.client.source)
logger.debug("Formatted path: %s", self.final_path) logger.debug("Formatted path: %s", self.final_path)
@ -500,7 +528,9 @@ class Track(Media):
return self.final_path return self.final_path
@classmethod @classmethod
def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client): def from_album_meta(
cls, album: TrackMetadata, track: dict, client: Client
):
"""Return a new Track object initialized with info. """Return a new Track object initialized with info.
:param album: album metadata returned by API :param album: album metadata returned by API
@ -510,7 +540,9 @@ class Track(Media):
:raises: IndexError :raises: IndexError
""" """
meta = TrackMetadata(album=album, track=track, source=client.source) meta = TrackMetadata(album=album, track=track, source=client.source)
return cls(client=client, meta=meta, id=track["id"], part_of_tracklist=True) return cls(
client=client, meta=meta, id=track["id"], part_of_tracklist=True
)
@classmethod @classmethod
def from_api(cls, item: dict, client: Client): def from_api(cls, item: dict, client: Client):
@ -554,6 +586,7 @@ class Track(Media):
album_meta: dict = None, album_meta: dict = None,
cover: Union[Picture, APIC, MP4Cover] = None, cover: Union[Picture, APIC, MP4Cover] = None,
embed_cover: bool = True, embed_cover: bool = True,
exclude_tags: Optional[Sequence] = None,
**kwargs, **kwargs,
): ):
"""Tag the track using the stored metadata. """Tag the track using the stored metadata.
@ -569,7 +602,9 @@ class Track(Media):
:param embed_cover: Embed cover art into file :param embed_cover: Embed cover art into file
:type embed_cover: bool :type embed_cover: bool
""" """
assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata" assert isinstance(
self.meta, TrackMetadata
), "meta must be TrackMetadata"
if not self.downloaded: if not self.downloaded:
logger.info( logger.info(
"Track %s not tagged because it was not downloaded", "Track %s not tagged because it was not downloaded",
@ -620,7 +655,10 @@ class Track(Media):
raise InvalidQuality(f'Invalid quality: "{self.quality}"') raise InvalidQuality(f'Invalid quality: "{self.quality}"')
# automatically generate key, value pairs based on container # automatically generate key, value pairs based on container
tags = self.meta.tags(self.container) tags = self.meta.tags(
self.container,
set(exclude_tags) if exclude_tags is not None else None,
)
for k, v in tags: for k, v in tags:
logger.debug("Setting %s tag to %s", k, v) logger.debug("Setting %s tag to %s", k, v)
audio[k] = v audio[k] = v
@ -690,7 +728,9 @@ class Track(Media):
self.format_final_path() self.format_final_path()
if not os.path.isfile(self.path): if not os.path.isfile(self.path):
logger.info("File %s does not exist. Skipping conversion.", self.path) logger.info(
"File %s does not exist. Skipping conversion.", self.path
)
secho(f"{self!s} does not exist. Skipping conversion.", fg="red") secho(f"{self!s} does not exist. Skipping conversion.", fg="red")
return return
@ -1093,7 +1133,8 @@ class Tracklist(list):
kwargs.get("max_connections", 3) kwargs.get("max_connections", 3)
) as executor: ) as executor:
future_map = { future_map = {
executor.submit(target, item, **kwargs): item for item in self executor.submit(target, item, **kwargs): item
for item in self
} }
try: try:
concurrent.futures.wait(future_map.keys()) concurrent.futures.wait(future_map.keys())
@ -1124,7 +1165,9 @@ class Tracklist(list):
secho(f"{item!s} exists. Skipping.", fg="yellow") secho(f"{item!s} exists. Skipping.", fg="yellow")
except NonStreamable as e: except NonStreamable as e:
e.print(item) e.print(item)
failed_downloads.append((item.client.source, item.type, item.id)) failed_downloads.append(
(item.client.source, item.type, item.id)
)
self.downloaded = True self.downloaded = True
@ -1453,7 +1496,9 @@ class Album(Tracklist, Media):
_cover_download(embed_cover_url, cover_path) _cover_download(embed_cover_url, cover_path)
hires_cov_path = os.path.join(self.folder, "cover.jpg") hires_cov_path = os.path.join(self.folder, "cover.jpg")
if kwargs.get("keep_hires_cover", True) and not os.path.exists(hires_cov_path): if kwargs.get("keep_hires_cover", True) and not os.path.exists(
hires_cov_path
):
logger.debug("Downloading hires cover") logger.debug("Downloading hires cover")
_cover_download(self.cover_urls["original"], hires_cov_path) _cover_download(self.cover_urls["original"], hires_cov_path)
@ -1507,7 +1552,9 @@ class Album(Tracklist, Media):
and isinstance(item, Track) and isinstance(item, Track)
and kwargs.get("folder_format") and kwargs.get("folder_format")
): ):
disc_folder = os.path.join(self.folder, f"Disc {item.meta.discnumber}") disc_folder = os.path.join(
self.folder, f"Disc {item.meta.discnumber}"
)
kwargs["parent_folder"] = disc_folder kwargs["parent_folder"] = disc_folder
else: else:
kwargs["parent_folder"] = self.folder kwargs["parent_folder"] = self.folder
@ -1522,6 +1569,7 @@ class Album(Tracklist, Media):
item.tag( item.tag(
cover=self.cover_obj, cover=self.cover_obj,
embed_cover=kwargs.get("embed_cover", True), embed_cover=kwargs.get("embed_cover", True),
exclude_tags=kwargs.get("exclude_tags"),
) )
self.downloaded_ids.add(item.id) self.downloaded_ids.add(item.id)
@ -1601,7 +1649,9 @@ class Album(Tracklist, Media):
:rtype: str :rtype: str
""" """
formatted_folder = clean_format(self.folder_format, self._get_formatter()) formatted_folder = clean_format(
self.folder_format, self._get_formatter()
)
return os.path.join(parent_folder, formatted_folder) return os.path.join(parent_folder, formatted_folder)
@ -1719,7 +1769,9 @@ class Playlist(Tracklist, Media):
if self.client.source == "qobuz": if self.client.source == "qobuz":
self.name = self.meta["name"] self.name = self.meta["name"]
self.image = self.meta["images"] self.image = self.meta["images"]
self.creator = safe_get(self.meta, "owner", "name", default="Qobuz") self.creator = safe_get(
self.meta, "owner", "name", default="Qobuz"
)
tracklist = self.meta["tracks"]["items"] tracklist = self.meta["tracks"]["items"]
@ -1729,7 +1781,9 @@ class Playlist(Tracklist, Media):
elif self.client.source == "tidal": elif self.client.source == "tidal":
self.name = self.meta["title"] self.name = self.meta["title"]
self.image = tidal_cover_url(self.meta["image"], 640) self.image = tidal_cover_url(self.meta["image"], 640)
self.creator = safe_get(self.meta, "creator", "name", default="TIDAL") self.creator = safe_get(
self.meta, "creator", "name", default="TIDAL"
)
tracklist = self.meta["tracks"] tracklist = self.meta["tracks"]
@ -1742,7 +1796,9 @@ class Playlist(Tracklist, Media):
elif self.client.source == "deezer": elif self.client.source == "deezer":
self.name = self.meta["title"] self.name = self.meta["title"]
self.image = self.meta["picture_big"] self.image = self.meta["picture_big"]
self.creator = safe_get(self.meta, "creator", "name", default="Deezer") self.creator = safe_get(
self.meta, "creator", "name", default="Deezer"
)
tracklist = self.meta["tracks"] tracklist = self.meta["tracks"]
@ -1783,7 +1839,9 @@ class Playlist(Tracklist, Media):
logger.debug("Loaded %d tracks from playlist %s", len(self), self.name) logger.debug("Loaded %d tracks from playlist %s", len(self), self.name)
def _prepare_download(self, parent_folder: str = "StreamripDownloads", **kwargs): def _prepare_download(
self, parent_folder: str = "StreamripDownloads", **kwargs
):
if kwargs.get("folder_format"): if kwargs.get("folder_format"):
fname = sanitize_filename(self.name) fname = sanitize_filename(self.name)
self.folder = os.path.join(parent_folder, fname) self.folder = os.path.join(parent_folder, fname)
@ -1995,7 +2053,9 @@ class Artist(Tracklist, Media):
final = self final = self
if isinstance(filters, tuple) and self.client.source == "qobuz": if isinstance(filters, tuple) and self.client.source == "qobuz":
filter_funcs = (getattr(self, f"_{filter_}") for filter_ in filters) filter_funcs = (
getattr(self, f"_{filter_}") for filter_ in filters
)
for func in filter_funcs: for func in filter_funcs:
final = filter(func, final) final = filter(func, final)
@ -2108,7 +2168,10 @@ class Artist(Tracklist, Media):
best_bd = bit_depth(a["bit_depth"] for a in group) best_bd = bit_depth(a["bit_depth"] for a in group)
best_sr = sampling_rate(a["sampling_rate"] for a in group) best_sr = sampling_rate(a["sampling_rate"] for a in group)
for album in group: for album in group:
if album["bit_depth"] == best_bd and album["sampling_rate"] == best_sr: if (
album["bit_depth"] == best_bd
and album["sampling_rate"] == best_sr
):
yield album yield album
break break

View file

@ -131,7 +131,9 @@ class TrackMetadata:
self.album = resp.get("title", "Unknown Album") self.album = resp.get("title", "Unknown Album")
self.tracktotal = resp.get("tracks_count", 1) self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list") or resp.get("genre") or [] self.genre = resp.get("genres_list") or resp.get("genre") or []
self.date = resp.get("release_date_original") or resp.get("release_date") self.date = resp.get("release_date_original") or resp.get(
"release_date"
)
self.copyright = resp.get("copyright") self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name") self.albumartist = safe_get(resp, "artist", "name")
self.albumcomposer = safe_get(resp, "composer", "name") self.albumcomposer = safe_get(resp, "composer", "name")
@ -140,7 +142,9 @@ class TrackMetadata:
self.disctotal = ( self.disctotal = (
max( max(
track.get("media_number", 1) track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}]) for track in safe_get(
resp, "tracks", "items", default=[{}]
)
) )
or 1 or 1
) )
@ -179,14 +183,22 @@ class TrackMetadata:
self.cover_urls = get_cover_urls(resp, self.__source) self.cover_urls = get_cover_urls(resp, self.__source)
self.streamable = resp.get("allowStreaming", False) self.streamable = resp.get("allowStreaming", False)
if q := resp.get("audioQuality"): # for album entries in single tracks if q := resp.get(
"audioQuality"
): # for album entries in single tracks
self._get_tidal_quality(q) self._get_tidal_quality(q)
elif self.__source == "deezer": elif self.__source == "deezer":
self.album = resp.get("title", "Unknown Album") self.album = resp.get("title", "Unknown Album")
self.tracktotal = resp.get("track_total", 0) or resp.get("nb_tracks", 0) self.tracktotal = resp.get("track_total", 0) or resp.get(
"nb_tracks", 0
)
self.disctotal = ( self.disctotal = (
max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1 max(
track.get("disk_number")
for track in resp.get("tracks", [{}])
)
or 1
) )
self.genre = safe_get(resp, "genres", "data") self.genre = safe_get(resp, "genres", "data")
self.date = resp.get("release_date") self.date = resp.get("release_date")
@ -343,7 +355,9 @@ class TrackMetadata:
if isinstance(self._genres, list): if isinstance(self._genres, list):
if self.__source == "qobuz": if self.__source == "qobuz":
genres: Iterable = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) genres: Iterable = re.findall(
r"([^\u2192\/]+)", "/".join(self._genres)
)
genres = set(genres) genres = set(genres)
elif self.__source == "deezer": elif self.__source == "deezer":
genres = (g["name"] for g in self._genres) genres = (g["name"] for g in self._genres)
@ -377,7 +391,9 @@ class TrackMetadata:
if hasattr(self, "_copyright"): if hasattr(self, "_copyright"):
if self._copyright is None: if self._copyright is None:
return None return None
copyright: str = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright) copyright: str = re.sub(
r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright
)
copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, copyright) copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, copyright)
return copyright return copyright
@ -437,7 +453,9 @@ class TrackMetadata:
formatter["sampling_rate"] /= 1000 formatter["sampling_rate"] /= 1000
return formatter return formatter
def tags(self, container: str = "flac") -> Generator: def tags(
self, container: str = "flac", exclude: Optional[set] = None
) -> Generator:
"""Create a generator of key, value pairs for use with mutagen. """Create a generator of key, value pairs for use with mutagen.
The *_KEY dicts are organized in the format: The *_KEY dicts are organized in the format:
@ -459,41 +477,52 @@ class TrackMetadata:
:type container: str :type container: str
:rtype: Generator :rtype: Generator
""" """
if exclude is None:
exclude = set()
logger.debug("Excluded tags: %s", exclude)
container = container.lower() container = container.lower()
if container in ("flac", "vorbis"): if container in ("flac", "vorbis"):
return self.__gen_flac_tags() return self.__gen_flac_tags(exclude)
if container in ("mp3", "id3"): if container in ("mp3", "id3"):
return self.__gen_mp3_tags() return self.__gen_mp3_tags(exclude)
if container in ("alac", "m4a", "mp4", "aac"): if container in ("alac", "m4a", "mp4", "aac"):
return self.__gen_mp4_tags() return self.__gen_mp4_tags(exclude)
raise InvalidContainerError(f"Invalid container {container}") raise InvalidContainerError(f"Invalid container {container}")
def __gen_flac_tags(self) -> Generator: def __gen_flac_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag FLAC files. """Generate key, value pairs to tag FLAC files.
:rtype: Tuple[str, str] :rtype: Tuple[str, str]
""" """
for k, v in FLAC_KEY.items(): for k, v in FLAC_KEY.items():
logger.debug("attr: %s", k)
if k in exclude:
continue
tag = getattr(self, k) tag = getattr(self, k)
if tag: if tag:
if k in ( if k in {
"tracknumber", "tracknumber",
"discnumber", "discnumber",
"tracktotal", "tracktotal",
"disctotal", "disctotal",
): }:
tag = f"{int(tag):02}" tag = f"{int(tag):02}"
logger.debug("Adding tag %s: %s", v, tag) logger.debug("Adding tag %s: %s", v, tag)
yield (v, str(tag)) yield (v, str(tag))
def __gen_mp3_tags(self) -> Generator: def __gen_mp3_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag MP3 files. """Generate key, value pairs to tag MP3 files.
:rtype: Tuple[str, str] :rtype: Tuple[str, str]
""" """
for k, v in MP3_KEY.items(): for k, v in MP3_KEY.items():
if k in exclude:
continue
if k == "tracknumber": if k == "tracknumber":
text = f"{self.tracknumber}/{self.tracktotal}" text = f"{self.tracknumber}/{self.tracktotal}"
elif k == "discnumber": elif k == "discnumber":
@ -504,12 +533,15 @@ class TrackMetadata:
if text is not None and v is not None: if text is not None and v is not None:
yield (v.__name__, v(encoding=3, text=text)) yield (v.__name__, v(encoding=3, text=text))
def __gen_mp4_tags(self) -> Generator: def __gen_mp4_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag ALAC or AAC files. """Generate key, value pairs to tag ALAC or AAC files.
:rtype: Tuple[str, str] :rtype: Tuple[str, str]
""" """
for k, v in MP4_KEY.items(): for k, v in MP4_KEY.items():
if k in exclude:
continue
if k == "tracknumber": if k == "tracknumber":
text = [(self.tracknumber, self.tracktotal)] text = [(self.tracknumber, self.tracktotal)]
elif k == "discnumber": elif k == "discnumber":
@ -581,7 +613,9 @@ class TrackMetadata:
:rtype: int :rtype: int
""" """
return sum(hash(v) for v in self.asdict().values() if isinstance(v, Hashable)) return sum(
hash(v) for v in self.asdict().values() if isinstance(v, Hashable)
)
def __repr__(self) -> str: def __repr__(self) -> str:
"""Return the string representation of the metadata object. """Return the string representation of the metadata object.