Fix Tidal downloads, #51

This commit is contained in:
nathom 2021-04-14 16:13:32 -07:00
parent 649964b4e4
commit 0304fae688
5 changed files with 129 additions and 42 deletions

View file

@ -1,9 +1,9 @@
"""A config class that manages arguments between the config file and CLI.""" """A config class that manages arguments between the config file and CLI."""
import copy import copy
import logging import logging
import os import os
from pprint import pformat import re
from pprint import pformat, pprint
from ruamel.yaml import YAML from ruamel.yaml import YAML
@ -22,6 +22,15 @@ yaml = YAML()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ---------- Utilities -------------
def _set_to_none(d: dict):
for k, v in d.items():
if isinstance(v, dict):
_set_to_none(v)
else:
d[k] = None
class Config: class Config:
"""Config class that handles command line args and config files. """Config class that handles command line args and config files.
@ -183,3 +192,73 @@ class Config:
def __repr__(self): def __repr__(self):
return f"Config({pformat(self.session)})" return f"Config({pformat(self.session)})"
class ConfigDocumentationHelper:
"""A helper class that writes documentation for the config file.
qobuz:
quality: 1: 320kbps MP3, 2: 16/44.1, 3: 24/<=96, 4: 24/>=96
app_id: Do not change
secrets: Do not change
tidal:
quality: 0, 1, 2, or 3
user_id: Do not change
country_code: Do not change
access_token: Do not change
refresh_token: Do not change
token_expiry: Do not change
deezer: Does not require login
quality: 0, 1, or 2
soundcloud:
quality: Only 0 is available
database: This stores a list of item IDs so that repeats are not downloaded.
filters: Filter a Qobuz artist's discography. Values set here will be applied every use, unless overrided by command line arguments.
extras: Collectors Editions, Live Recordings, etc.
repeats: Picks the highest quality out of albums with identical titles.
non_albums: Remove EPs and Singles
features: Remove albums whose artist is not the one requested
non_remaster: Only download remastered albums
downloads:
folder: Folder where tracks are downloaded to
source_subdirectories: Put Qobuz albums in a 'Qobuz' folder, Tidal albums in 'Tidal' etc.
artwork:
embed: Write the image to the audio file
size: The size of the artwork to embed. Options: thumbnail, small, large, original. 'original' images can be up to 30MB, and may fail embedding. Using 'large' is recommended.
keep_hires_cover: Save the cover image at the highest quality as a seperate jpg file
metadata: Only applicable for playlist downloads.
set_playlist_to_album: Sets the value of the 'ALBUM' field in the metadata to the playlist's name. This is useful if your music library software organizes tracks based on album name.
new_playlist_tracknumbers: Replaces the original track's tracknumber with it's position in the playlist
path_format: Changes the folder and file names generated by streamrip.
folder: Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate", and "container"
track: Available keys: "tracknumber", "artist", "albumartist", "composer", and "title"
lastfm: Last.fm playlists are downloaded by searching for the titles of the tracks
source: The source on which to search for the tracks.
concurrent_downoads: Download (and convert) tracks all at once, instead of sequentially. If you are converting the tracks, and/or have fast internet, this will substantially improve processing speed.
"""
comments = _set_to_none(copy.deepcopy(Config.defaults))
def __init__(self):
self.docs = []
doctext = self.__doc__
keyval = re.compile(r"( *)([\w_]+):\s*(.*)")
lines = (line[4:] for line in doctext.split("\n")[1:-1])
for line in lines:
info = list(keyval.match(line).groups())
if len(info) == 3:
info[0] = len(info[0]) // 4
else:
info.insert(0, 0)
self.docs.append(info)
pprint(self.docs)
def write(self):
pass
def __setitem__(self, key, val):
self.comments[key] = val
def __getitem__(self, key):
return self.comments[key]

View file

@ -518,4 +518,3 @@ class MusicDL(list):
or self.config.file[source]["password"] is None or self.config.file[source]["password"] is None
): ):
self.prompt_creds(source) self.prompt_creds(source)

View file

@ -3,6 +3,7 @@ downloadable form.
""" """
import concurrent.futures import concurrent.futures
import functools
import logging import logging
import os import os
import re import re
@ -132,15 +133,6 @@ class Track:
logger.debug("No cover found") logger.debug("No cover found")
self.cover_url = None self.cover_url = None
@staticmethod
def _get_tracklist(resp, source) -> list:
if source == "qobuz":
return resp["tracks"]["items"]
if source in ("tidal", "deezer"):
return resp["tracks"]
raise NotImplementedError(source)
def _prepare_download(self, **kwargs): def _prepare_download(self, **kwargs):
# args override attributes # args override attributes
self.quality = min(kwargs["quality"], self.client.max_quality) self.quality = min(kwargs["quality"], self.client.max_quality)
@ -343,7 +335,7 @@ class Track:
return self.final_path return self.final_path
@classmethod @classmethod
def from_album_meta(cls, album: dict, pos: int, client: Client): def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client):
"""Return a new Track object initialized with info from the album dicts """Return a new Track object initialized with info from the album dicts
returned by client.get calls. returned by client.get calls.
@ -354,9 +346,6 @@ class Track:
:raises IndexError :raises IndexError
""" """
tracklist = cls._get_tracklist(album, client.source)
logger.debug(len(tracklist))
track = tracklist[pos]
meta = TrackMetadata(album=album, track=track, source=client.source) meta = TrackMetadata(album=album, track=track, source=client.source)
return cls(client=client, meta=meta, id=track["id"]) return cls(client=client, meta=meta, id=track["id"])
@ -805,16 +794,16 @@ class Album(Tracklist):
"""Load detailed metadata from API using the id.""" """Load detailed metadata from API using the id."""
assert hasattr(self, "id"), "id must be set to load metadata" assert hasattr(self, "id"), "id must be set to load metadata"
self.meta = self.client.get(self.id, media_type="album") resp = self.client.get(self.id, media_type="album")
# update attributes based on response # update attributes based on response
info = self._parse_get_resp(self.meta, self.client).items() self.meta = self._parse_get_resp(resp, self.client)
self.__dict__.update(info) self.__dict__.update(self.meta.asdict()) # used for identification
if not self.get("streamable", False): if not self.get("streamable", False):
raise NonStreamable(f"This album is not streamable ({self.id} ID)") raise NonStreamable(f"This album is not streamable ({self.id} ID)")
self._load_tracks() self._load_tracks(resp)
self.loaded = True self.loaded = True
@classmethod @classmethod
@ -848,7 +837,10 @@ class Album(Tracklist):
if embed_cover_url is not None: if embed_cover_url is not None:
tqdm_download(embed_cover_url, cover_path) tqdm_download(embed_cover_url, cover_path)
else: # sometimes happens with Deezer else: # sometimes happens with Deezer
tqdm_download(self.cover_urls["small"], cover_path) cover_url = functools.reduce(
lambda c1, c2: c1 or c2, self.cover_urls.values()
)
tqdm_download(cover_url, cover_path)
if kwargs.get("keep_hires_cover", True): if kwargs.get("keep_hires_cover", True):
tqdm_download( tqdm_download(
@ -904,11 +896,11 @@ class Album(Tracklist):
:type resp: dict :type resp: dict
:rtype: dict :rtype: dict
""" """
meta = TrackMetadata(album=resp, source=client.source).asdict() meta = TrackMetadata(album=resp, source=client.source)
meta["id"] = resp["id"] meta.id = resp["id"]
return meta return meta
def _load_tracks(self): def _load_tracks(self, resp):
"""Given an album metadata dict returned by the API, append all of its """Given an album metadata dict returned by the API, append all of its
tracks to `self`. tracks to `self`.
@ -916,10 +908,14 @@ class Album(Tracklist):
stores the metadata inside a TrackMetadata object. stores the metadata inside a TrackMetadata object.
""" """
logging.debug(f"Loading {self.tracktotal} tracks to album") logging.debug(f"Loading {self.tracktotal} tracks to album")
for i in range(self.tracktotal): for track in _get_tracklist(resp, self.client.source):
# append method inherited from superclass list # append method inherited from superclass list
self.append( self.append(
Track.from_album_meta(album=self.meta, pos=i, client=self.client) Track.from_album_meta(
album=self.meta,
track=track,
client=self.client,
)
) )
def _get_formatter(self) -> dict: def _get_formatter(self) -> dict:
@ -1489,3 +1485,12 @@ class Label(Artist):
:rtype: str :rtype: str
""" """
return self.name return self.name
def _get_tracklist(resp, source) -> list:
if source == "qobuz":
return resp["tracks"]["items"]
if source in ("tidal", "deezer"):
return resp["tracks"]
raise NotImplementedError(source)

View file

@ -15,7 +15,7 @@ from .constants import (
TRACK_KEYS, TRACK_KEYS,
) )
from .exceptions import InvalidContainerError, InvalidSourceError from .exceptions import InvalidContainerError, InvalidSourceError
from .utils import get_quality_id, safe_get from .utils import get_quality_id, safe_get, tidal_cover_url
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -90,15 +90,21 @@ class TrackMetadata:
if isinstance(track, TrackMetadata): if isinstance(track, TrackMetadata):
self.update(track) self.update(track)
if isinstance(album, TrackMetadata): elif track is not None:
self.update(album)
if track is not None:
self.add_track_meta(track) self.add_track_meta(track)
if album is not None: if isinstance(album, TrackMetadata):
self.update(album)
elif album is not None:
self.add_album_meta(album) self.add_album_meta(album)
def update(self, meta):
assert isinstance(meta, TrackMetadata)
for k, v in meta.asdict().items():
if v is not None:
setattr(self, k, v)
def add_album_meta(self, resp: dict): def add_album_meta(self, resp: dict):
"""Parse the metadata from an resp dict returned by the """Parse the metadata from an resp dict returned by the
API. API.
@ -154,12 +160,11 @@ class TrackMetadata:
# non-embedded # non-embedded
self.explicit = resp.get("explicit", False) self.explicit = resp.get("explicit", False)
# 80, 160, 320, 640, 1280
uuid = resp.get("cover")
self.cover_urls = { self.cover_urls = {
sk: resp.get(rk) # size key, resp key sk: tidal_cover_url(uuid, size)
for sk, rk in zip( for sk, size in zip(COVER_SIZES, (160, 320, 640, 1280))
COVER_SIZES,
("cover", "cover_medium", "cover_large", "cover_xl"),
)
} }
self.streamable = resp.get("allowStreaming", False) self.streamable = resp.get("allowStreaming", False)
self.quality = TIDAL_Q_MAP[resp["audioQuality"]] self.quality = TIDAL_Q_MAP[resp["audioQuality"]]
@ -225,6 +230,8 @@ class TrackMetadata:
self.tracknumber = track.get("track_position", 1) self.tracknumber = track.get("track_position", 1)
self.discnumber = track.get("disk_number") self.discnumber = track.get("disk_number")
self.artist = track.get("artist", {}).get("name") self.artist = track.get("artist", {}).get("name")
if track.get("album"):
self.add_album_meta(track["album"])
elif self.__source == "soundcloud": elif self.__source == "soundcloud":
self.title = track["title"].strip() self.title = track["title"].strip()
@ -240,9 +247,6 @@ class TrackMetadata:
else: else:
raise ValueError(self.__source) raise ValueError(self.__source)
if track.get("album"):
self.add_album_meta(track["album"])
def _mod_title(self, version, work): def _mod_title(self, version, work):
if version is not None: if version is not None:
self.title = f"{self.title} ({version})" self.title = f"{self.title} ({version})"

View file

@ -16,7 +16,7 @@ from requests.packages import urllib3
from tqdm import tqdm from tqdm import tqdm
from tqdm.contrib import DummyTqdmFile from tqdm.contrib import DummyTqdmFile
from .constants import LOG_DIR, TIDAL_COVER_URL, AGENT from .constants import AGENT, LOG_DIR, TIDAL_COVER_URL
from .exceptions import InvalidSourceError, NonStreamable from .exceptions import InvalidSourceError, NonStreamable
urllib3.disable_warnings() urllib3.disable_warnings()
@ -277,7 +277,7 @@ def extract_interpreter_url(url: str) -> str:
:type url: str :type url: str
:rtype: str :rtype: str
""" """
session = gen_threadsafe_session({'User-Agent': AGENT}) session = gen_threadsafe_session({"User-Agent": AGENT})
r = session.get(url) r = session.get(url)
artist_id = re.search(r"getSimilarArtist\(\s*'(\w+)'", r.text).group(1) artist_id = re.search(r"getSimilarArtist\(\s*'(\w+)'", r.text).group(1)
return artist_id return artist_id