Remove album parsing in dowloader.py

This commit is contained in:
nathom 2021-04-13 16:30:28 -07:00
parent d2b21ca937
commit 3967faaa44
3 changed files with 96 additions and 131 deletions

View file

@ -144,6 +144,17 @@ LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+"
TIDAL_MAX_Q = 7 TIDAL_MAX_Q = 7
TIDAL_Q_MAP = {
"LOW": 0,
"HIGH": 1,
"LOSSLESS": 2,
"HI_RES": 3,
}
DEEZER_MAX_Q = 6 DEEZER_MAX_Q = 6
AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4) AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4)
MEDIA_TYPES = ("track", "album", "artist", "label", "playlist") MEDIA_TYPES = ("track", "album", "artist", "label", "playlist")
# used to homogenize cover size keys
COVER_SIZES = ("thumbnail", "small", "large", "original")

View file

@ -2,13 +2,13 @@
downloadable form. downloadable form.
""" """
import concurrent.futures
import logging import logging
import os import os
import re import re
import shutil import shutil
import subprocess import subprocess
import concurrent.futures from pprint import pformat, pprint
from pprint import pformat
from tempfile import gettempdir from tempfile import gettempdir
from typing import Any, Generator, Iterable, Union from typing import Any, Generator, Iterable, Union
@ -23,6 +23,7 @@ from . import converter
from .clients import ClientInterface from .clients import ClientInterface
from .constants import ( from .constants import (
ALBUM_KEYS, ALBUM_KEYS,
COVER_SIZES,
FLAC_MAX_BLOCKSIZE, FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT, FOLDER_FORMAT,
TRACK_FORMAT, TRACK_FORMAT,
@ -48,15 +49,6 @@ from .utils import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
urllib3.disable_warnings() urllib3.disable_warnings()
TIDAL_Q_MAP = {
"LOW": 0,
"HIGH": 1,
"LOSSLESS": 2,
"HI_RES": 3,
}
# used to homogenize cover size keys
COVER_SIZES = ("thumbnail", "small", "large", "original")
TYPE_REGEXES = { TYPE_REGEXES = {
"remaster": re.compile(r"(?i)(re)?master(ed)?"), "remaster": re.compile(r"(?i)(re)?master(ed)?"),
@ -606,6 +598,7 @@ class Tracklist(list):
the tracklist. the tracklist.
""" """
# anything not in parentheses or brackets
essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*") essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*")
def download(self, **kwargs): def download(self, **kwargs):
@ -623,7 +616,7 @@ class Tracklist(list):
if kwargs.get("concurrent_downloads", True): if kwargs.get("concurrent_downloads", True):
# Tidal errors out with unlimited concurrency # Tidal errors out with unlimited concurrency
max_workers = 15 if self.client.source == 'tidal' else None max_workers = 15 if self.client.source == "tidal" else None
with concurrent.futures.ThreadPoolExecutor(max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:
futures = [executor.submit(target, item, **kwargs) for item in self] futures = [executor.submit(target, item, **kwargs) for item in self]
try: try:
@ -658,7 +651,7 @@ class Tracklist(list):
if isinstance(key, int): if isinstance(key, int):
if 0 <= key < len(self): if 0 <= key < len(self):
return super().__getitem__(key) return self[key]
return default return default
@ -915,97 +908,7 @@ class Album(Tracklist):
:type resp: dict :type resp: dict
:rtype: dict :rtype: dict
""" """
if client.source == "qobuz": return TrackMetadata(album=resp, source=client.source).asdict()
if resp.get("maximum_sampling_rate", False):
sampling_rate = resp["maximum_sampling_rate"] * 1000
else:
sampling_rate = None
resp["image"]["original"] = resp["image"]["large"].replace("600", "org")
# TODO: combine these with TrackMetadata objects
return {
"id": resp.get("id"),
"title": resp.get("title"),
"_artist": resp.get("artist") or resp.get("performer"),
"albumartist": safe_get(resp, "artist", "name"),
"year": str(resp.get("release_date_original"))[:4],
"version": resp.get("version"),
"composer": safe_get(resp, "composer", "name"),
"release_type": resp.get("release_type", "album"),
"cover_urls": resp.get("image"),
"streamable": resp.get("streamable"),
"genre": safe_get(resp, 'genre', 'name'),
"quality": get_quality_id(
resp.get("maximum_bit_depth"), resp.get("maximum_sampling_rate")
),
"bit_depth": resp.get("maximum_bit_depth"),
"sampling_rate": sampling_rate,
"tracktotal": resp.get("tracks_count"),
"description": resp.get("description"),
"disctotal": max(
track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}])
)
or 1,
"explicit": resp.get("parental_warning", False),
}
elif client.source == "tidal":
return {
"id": resp.get("id"),
"title": resp.get("title"),
"_artist": safe_get(resp, "artist", "name"),
"albumartist": safe_get(resp, "artist", "name"),
"year": resp.get("releaseDate")[:4],
"version": resp.get("version"),
"cover_urls": {
size: tidal_cover_url(resp.get("cover"), x)
for size, x in zip(COVER_SIZES, (160, 320, 640, 1280))
},
"streamable": resp.get("allowStreaming"),
"quality": TIDAL_Q_MAP[resp.get("audioQuality")],
"bit_depth": 24 if resp.get("audioQuality") == "HI_RES" else 16,
"sampling_rate": 48000
if resp.get("audioQuality") == "HI_RES"
else 41000,
"tracktotal": resp.get("numberOfTracks"),
"disctotal": resp.get("numberOfVolumes"),
"explicit": resp.get("explicit", False),
}
elif client.source == "deezer":
if resp.get("release_date", False):
year = resp["release_date"][:4]
else:
year = None
return {
"id": resp.get("id"),
"title": resp.get("title"),
"_artist": safe_get(resp, "artist", "name"),
"albumartist": safe_get(resp, "artist", "name"),
"year": year,
# version not given by API
"cover_urls": {
sk: resp.get(rk) # size key, resp key
for sk, rk in zip(
COVER_SIZES,
("cover", "cover_medium", "cover_large", "cover_xl"),
)
},
"url": resp.get("link"),
"streamable": True, # api only returns streamables
"quality": 2, # all tracks are 16/44.1 streamable
"bit_depth": 16,
"sampling_rate": 44100,
"tracktotal": resp.get("track_total") or resp.get("nb_tracks"),
"disctotal": max(
track.get("disk_number") for track in resp.get("tracks", [{}])
)
or 1,
"explicit": bool(resp.get("explicit_content_lyrics")),
}
raise InvalidSourceError(client.source)
def _load_tracks(self): def _load_tracks(self):
"""Given an album metadata dict returned by the API, append all of its """Given an album metadata dict returned by the API, append all of its

View file

@ -1,20 +1,22 @@
"""Manages the information that will be embeded in the audio file. """ """Manages the information that will be embeded in the audio file. """
import json
import logging import logging
import re import re
from typing import Generator, Optional, Tuple, Union from functools import cache
from typing import Generator, Hashable, Optional, Tuple, Union
from .constants import ( from .constants import (
COPYRIGHT, COPYRIGHT,
COVER_SIZES,
FLAC_KEY, FLAC_KEY,
MP3_KEY, MP3_KEY,
MP4_KEY, MP4_KEY,
PHON_COPYRIGHT, PHON_COPYRIGHT,
TIDAL_Q_MAP,
TRACK_KEYS, TRACK_KEYS,
) )
from .exceptions import InvalidContainerError from .exceptions import InvalidContainerError, InvalidSourceError
from .utils import safe_get from .utils import get_quality_id, safe_get
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -57,6 +59,7 @@ class TrackMetadata:
:param album: album dict from API :param album: album dict from API
:type album: Optional[dict] :type album: Optional[dict]
""" """
# embedded information
self.title = None self.title = None
self.album = None self.album = None
self.albumartist = None self.albumartist = None
@ -75,12 +78,21 @@ class TrackMetadata:
# not included in tags # not included in tags
self.explicit = False self.explicit = False
self.quality = None
self.sampling_rate = None
self.bit_depth = None
# Internals
self._artist = None
self._copyright = None
self._genres = None
self.__source = source self.__source = source
if track is None and album is None: if isinstance(track, TrackMetadata):
logger.debug("No params passed, returning") self.update(track)
return if isinstance(album, TrackMetadata):
self.update(album)
if track is not None: if track is not None:
self.add_track_meta(track) self.add_track_meta(track)
@ -90,20 +102,19 @@ class TrackMetadata:
def add_album_meta(self, resp: dict): def add_album_meta(self, resp: dict):
"""Parse the metadata from an resp dict returned by the """Parse the metadata from an resp dict returned by the
Qobuz API. API.
:param dict resp: from API :param dict resp: from API
""" """
if self.__source == "qobuz": if self.__source == "qobuz":
# Tags
self.album = resp.get("title") self.album = resp.get("title")
self.tracktotal = resp.get("tracks_count", 1) self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list", []) self.genre = resp.get("genres_list", [])
self.date = resp.get("release_date_original") or resp.get("release_date") self.date = resp.get("release_date_original") or resp.get("release_date")
if self.date:
self.year = self.date[:4]
self.copyright = resp.get("copyright") self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name") self.albumartist = safe_get(resp, "artist", "name")
self.composer = safe_get(resp, "composer", "name")
self.label = resp.get("label") self.label = resp.get("label")
self.description = resp.get("description") self.description = resp.get("description")
self.disctotal = ( self.disctotal = (
@ -118,35 +129,72 @@ class TrackMetadata:
if isinstance(self.label, dict): if isinstance(self.label, dict):
self.label = self.label.get("name") self.label = self.label.get("name")
# Non-embedded information
self.version = resp.get("version")
self.cover_urls = resp.get("image")
self.cover_urls["original"] = self.cover_urls["large"].replace("600", "org")
self.streamable = resp.get("streamable", False)
self.bit_depth = resp.get("maximum_bit_depth")
self.sampling_rate = resp.get("maximum_sampling_rate")
self.quality = get_quality_id(self.bit_depth, self.sampling_rate)
if self.sampling_rate is not None:
self.sampling_rate *= 1000
elif self.__source == "tidal": elif self.__source == "tidal":
self.album = resp.get("title") self.album = resp.get("title")
self.tracktotal = resp.get("numberOfTracks") self.tracktotal = resp.get("numberOfTracks")
# genre not returned by API # genre not returned by API
self.date = resp.get("releaseDate") self.date = resp.get("releaseDate")
if self.date:
self.year = self.date[:4]
self.copyright = resp.get("copyright") self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name") self.albumartist = safe_get(resp, "artist", "name")
self.disctotal = resp.get("numberOfVolumes") self.disctotal = resp.get("numberOfVolumes")
self.isrc = resp.get("isrc") self.isrc = resp.get("isrc")
self.explicit = resp.get("explicit", False)
# label not returned by API # label not returned by API
# non-embedded
self.explicit = resp.get("explicit", False)
self.cover_urls = {
sk: resp.get(rk) # size key, resp key
for sk, rk in zip(
COVER_SIZES,
("cover", "cover_medium", "cover_large", "cover_xl"),
)
}
self.streamable = resp.get("allowStreaming", False)
self.quality = TIDAL_Q_MAP[resp["audioQuality"]]
elif self.__source == "deezer": elif self.__source == "deezer":
self.album = resp.get("title") self.album = resp.get("title")
self.tracktotal = resp.get("track_total") self.tracktotal = resp.get("track_total") or resp.get("nb_tracks")
self.disctotal = (
max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1
)
self.genre = safe_get(resp, "genres", "data") self.genre = safe_get(resp, "genres", "data")
self.date = resp.get("release_date") self.date = resp.get("release_date")
self.albumartist = safe_get(resp, "artist", "name") self.albumartist = safe_get(resp, "artist", "name")
self.label = resp.get("label") self.label = resp.get("label")
# either 0 or 1 self.url = resp.get("link")
# not embedded
self.explicit = bool(resp.get("parental_warning")) self.explicit = bool(resp.get("parental_warning"))
self.quality = 2
self.bit_depth = 16
self.cover_urls = {
sk: resp.get(rk) # size key, resp key
for sk, rk in zip(
COVER_SIZES,
("cover", "cover_medium", "cover_large", "cover_xl"),
)
}
self.sampling_rate = 44100
self.streamable = True
elif self.__source == "soundcloud": elif self.__source == "soundcloud":
raise NotImplementedError raise NotImplementedError
else: else:
raise ValueError(self.__source) raise InvalidSourceError(self.__source)
def add_track_meta(self, track: dict): def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by an """Parse the metadata from a track dict returned by an
@ -230,8 +278,8 @@ class TrackMetadata:
@property @property
def genre(self) -> Union[str, None]: def genre(self) -> Union[str, None]:
"""Formats the genre list returned by the Qobuz API. """Formats the genre list returned by the Qobuz API.
>>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé'] >>> meta.genre = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé']
>>> _format_genres(g) >>> meta.genre
'Pop, Rock, Alternatif et Indé' 'Pop, Rock, Alternatif et Indé'
:rtype: str :rtype: str
@ -240,14 +288,11 @@ class TrackMetadata:
return None return None
if isinstance(self._genres, list): if isinstance(self._genres, list):
if self.__source == "qobuz":
genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres)) genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres))
no_repeats = [] genres = set(genres)
for genre in genres: return ", ".join(genres)
if genre not in no_repeats:
no_repeats.append(genre)
return ", ".join(no_repeats)
elif isinstance(self._genres, str): elif isinstance(self._genres, str):
return self._genres return self._genres
@ -400,6 +445,9 @@ class TrackMetadata:
if v is not None and text is not None: if v is not None and text is not None:
yield (v, text) yield (v, text)
def asdict(self) -> dict:
return {k: getattr(self, k) for k in dir(self) if not k.startswith("_")}
def __setitem__(self, key, val): def __setitem__(self, key, val):
"""Dict-like access for tags. """Dict-like access for tags.
@ -441,10 +489,13 @@ class TrackMetadata:
""" """
return self.__setitem__(key, val) return self.__setitem__(key, val)
def __hash__(self) -> int:
return sum(hash(v) for v in self.asdict().values() if isinstance(v, Hashable))
def __repr__(self) -> str: def __repr__(self) -> str:
"""Returns the string representation of the metadata object. """Returns the string representation of the metadata object.
:rtype: str :rtype: str
""" """
# TODO: make a more readable repr # TODO: make a more readable repr
return json.dumps(self.__dict__, indent=2) return f"<TrackMetadata object {hex(hash(self))}>"