Changes to config artwork field

Also when converting, the track is kept in the temp_file and moved after conversion.

Formatting.
This commit is contained in:
nathom 2021-04-10 20:50:13 -07:00
parent de1db82da8
commit 6ed5f77464
8 changed files with 110 additions and 94 deletions

View file

@ -1,4 +1,4 @@
'''streamrip: the all in one music downloader. """streamrip: the all in one music downloader.
''' """
__version__ = "0.4" __version__ = "0.4"

View file

@ -489,7 +489,11 @@ class TidalClient(ClientInterface):
"assetpresentation": "FULL", "assetpresentation": "FULL",
} }
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params) resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
try:
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
except KeyError:
raise Exception("You must have a TIDAL Hi-Fi account to download tracks.")
logger.debug(manifest) logger.debug(manifest)
return { return {
"url": manifest["urls"][0], "url": manifest["urls"][0],

View file

@ -1,4 +1,4 @@
'''A config class that manages arguments between the config file and CLI.''' """A config class that manages arguments between the config file and CLI."""
import copy import copy
import logging import logging
@ -77,10 +77,8 @@ class Config:
"downloads": {"folder": DOWNLOADS_DIR, "source_subdirectories": False}, "downloads": {"folder": DOWNLOADS_DIR, "source_subdirectories": False},
"artwork": { "artwork": {
"embed": True, "embed": True,
"embed_size": "large", "size": "large",
"download_size": "original", "keep_hires_cover": True,
"keep_embedded_cover": False,
"keep_downloaded_cover": True,
}, },
"metadata": { "metadata": {
"set_playlist_to_album": False, "set_playlist_to_album": False,

View file

@ -87,7 +87,6 @@ class Converter:
shutil.move(self.tempfile, self.final_fn) shutil.move(self.tempfile, self.final_fn)
logger.debug("Moved: %s -> %s", self.tempfile, self.final_fn) logger.debug("Moved: %s -> %s", self.tempfile, self.final_fn)
logger.debug("Converted: %s -> %s", self.filename, self.final_fn)
else: else:
raise ConversionError("No file was returned from conversion") raise ConversionError("No file was returned from conversion")

View file

@ -153,18 +153,15 @@ class MusicDL(list):
"parent_folder": self.config.session["downloads"]["folder"], "parent_folder": self.config.session["downloads"]["folder"],
"folder_format": self.config.session["path_format"]["folder"], "folder_format": self.config.session["path_format"]["folder"],
"track_format": self.config.session["path_format"]["track"], "track_format": self.config.session["path_format"]["track"],
"keep_downloaded_cover": self.config.session["artwork"][
"keep_downloaded_cover"
],
"keep_embedded_cover": self.config.session["artwork"][
"keep_embedded_cover"
],
"embed_cover": self.config.session["artwork"]["embed"], "embed_cover": self.config.session["artwork"]["embed"],
"embed_cover_size": self.config.session["artwork"]["embed_size"], "embed_cover_size": self.config.session["artwork"]["size"],
"download_cover_size": self.config.session["artwork"]["download_size"], "keep_hires_cover": self.config.session['artwork']['keep_hires_cover'],
"set_playlist_to_album": self.config.session["metadata"][ "set_playlist_to_album": self.config.session["metadata"][
"set_playlist_to_album" "set_playlist_to_album"
], ],
"stay_temp": self.config.session["conversion"]["enabled"],
} }
logger.debug("Arguments from config: %s", arguments) logger.debug("Arguments from config: %s", arguments)
for item in self: for item in self:

View file

@ -1,6 +1,6 @@
'''A simple wrapper over an sqlite database that stores """A simple wrapper over an sqlite database that stores
the downloaded media IDs. the downloaded media IDs.
''' """
import logging import logging
import os import os

View file

@ -1,6 +1,6 @@
'''These classes parse information from Clients into a universal, """These classes parse information from Clients into a universal,
downloadable form. downloadable form.
''' """
import logging import logging
import os import os
@ -53,7 +53,7 @@ TIDAL_Q_MAP = {
} }
# used to homogenize cover size keys # used to homogenize cover size keys
COVER_SIZES = ("thumbnail", "small", "large") COVER_SIZES = ("thumbnail", "small", "large", "original")
TYPE_REGEXES = { TYPE_REGEXES = {
"remaster": re.compile(r"(?i)(re)?master(ed)?"), "remaster": re.compile(r"(?i)(re)?master(ed)?"),
@ -178,22 +178,26 @@ class Track:
self.file_format = kwargs.get("track_format", TRACK_FORMAT) self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto") self.folder = sanitize_filepath(self.folder, platform="auto")
self.format_final_path()
os.makedirs(self.folder, exist_ok=True) os.makedirs(self.folder, exist_ok=True)
if database is not None: if isinstance(database, MusicDB):
if self.id in database: if self.id in database:
self.downloaded = True self.downloaded = True
self.tagged = True self.tagged = True
self.path = self.final_path
click.secho( click.secho(
f"{self['title']} already logged in database, skipping.", f"{self['title']} already logged in database, skipping.",
fg="magenta", fg="magenta",
) )
return False # because the track was not downloaded return False # because the track was not downloaded
if os.path.isfile(self.format_final_path()): # track already exists if os.path.isfile(self.final_path): # track already exists
self.downloaded = True self.downloaded = True
self.tagged = True self.tagged = True
self.path = self.final_path
click.secho(f"Track already downloaded: {self.final_path}", fg="magenta") click.secho(f"Track already downloaded: {self.final_path}", fg="magenta")
return False return False
@ -202,14 +206,15 @@ class Track:
self.download_cover() self.download_cover()
if self.client.source == "soundcloud": if self.client.source == "soundcloud":
# soundcloud client needs whole dict to get file url
url_id = self.resp url_id = self.resp
else: else:
url_id = self.id url_id = self.id
dl_info = self.client.get_file_url(url_id, self.quality) dl_info = self.client.get_file_url(url_id, self.quality)
temp_file = os.path.join(gettempdir(), f"~{hash(self.id)}_{quality}.tmp") self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp")
logger.debug("Temporary file path: %s", temp_file) logger.debug("Temporary file path: %s", self.path)
if self.client.source == "qobuz": if self.client.source == "qobuz":
if not (dl_info.get("sampling_rate") and dl_info.get("url")) or dl_info.get( if not (dl_info.get("sampling_rate") and dl_info.get("url")) or dl_info.get(
@ -224,35 +229,43 @@ class Track:
click.secho(f"\nDownloading {self!s}", fg="blue") click.secho(f"\nDownloading {self!s}", fg="blue")
# --------- Download Track ----------
if self.client.source in ("qobuz", "tidal"): if self.client.source in ("qobuz", "tidal"):
logger.debug("Downloadable URL found: %s", dl_info.get("url")) logger.debug("Downloadable URL found: %s", dl_info.get("url"))
tqdm_download(dl_info["url"], temp_file) # downloads file tqdm_download(dl_info["url"], self.path) # downloads file
elif self.client.source == "deezer": # Deezer elif self.client.source == "deezer": # Deezer
logger.debug("Downloadable URL found: %s", dl_info) logger.debug("Downloadable URL found: %s", dl_info)
try: try:
tqdm_download(dl_info, temp_file) # downloads file tqdm_download(dl_info, self.path) # downloads file
except NonStreamable: except NonStreamable:
logger.debug("Track is not downloadable %s", dl_info) logger.debug("Track is not downloadable %s", dl_info)
click.secho("Track is not available for download", fg="red") click.secho("Track is not available for download", fg="red")
return False return False
elif self.client.source == "soundcloud": elif self.client.source == "soundcloud":
temp_file = self._soundcloud_download(dl_info, temp_file) self._soundcloud_download(dl_info, self.path)
else: else:
raise InvalidSourceError(self.client.source) raise InvalidSourceError(self.client.source)
if isinstance(dl_info, dict) and dl_info.get("enc_key"): if (
decrypt_mqa_file(temp_file, self.final_path, dl_info["enc_key"]) self.client.source == "tidal"
else: and isinstance(dl_info, dict)
shutil.move(temp_file, self.final_path) and dl_info.get("enc_key", False)
):
out_path = f"{self.path}_dec"
decrypt_mqa_file(self.path, out_path, dl_info["enc_key"])
self.path = out_path
if not kwargs.get("stay_temp", False):
self.move(self.final_path)
if isinstance(database, MusicDB): if isinstance(database, MusicDB):
database.add(self.id) database.add(self.id)
logger.debug(f"{self.id} added to database") logger.debug(f"{self.id} added to database")
logger.debug("Downloaded: %s -> %s", temp_file, self.final_path) logger.debug("Downloaded: %s -> %s", self.path, self.final_path)
self.downloaded = True self.downloaded = True
@ -264,9 +277,14 @@ class Track:
return True return True
def _soundcloud_download(self, dl_info: dict, temp_file: str) -> str: def move(self, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.move(self.path, path)
self.path = path
def _soundcloud_download(self, dl_info: dict) -> str:
if dl_info["type"] == "mp3": if dl_info["type"] == "mp3":
temp_file += ".mp3" self.path += ".mp3"
# convert hls stream to mp3 # convert hls stream to mp3
subprocess.call( subprocess.call(
[ [
@ -276,24 +294,22 @@ class Track:
"-c", "-c",
"copy", "copy",
"-y", "-y",
temp_file, self.path,
"-loglevel", "-loglevel",
"fatal", "fatal",
] ]
) )
elif dl_info["type"] == "original": elif dl_info["type"] == "original":
tqdm_download(dl_info["url"], temp_file) tqdm_download(dl_info["url"], self.path)
# if a wav is returned, convert to flac # if a wav is returned, convert to flac
engine = converter.FLAC(temp_file) engine = converter.FLAC(self.path)
temp_file = f"{temp_file}.flac" self.path = f"{self.path}.flac"
engine.convert(custom_fn=temp_file) engine.convert(custom_fn=self.path)
self.final_path = self.final_path.replace(".mp3", ".flac") self.final_path = self.final_path.replace(".mp3", ".flac")
self.quality = 2 self.quality = 2
return temp_file
def download_cover(self): def download_cover(self):
"""Downloads the cover art, if cover_url is given.""" """Downloads the cover art, if cover_url is given."""
@ -405,15 +421,15 @@ class Track:
if self.quality in (2, 3, 4): if self.quality in (2, 3, 4):
self.container = "FLAC" self.container = "FLAC"
logger.debug("Tagging file with %s container", self.container) logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path) audio = FLAC(self.path)
elif self.quality <= 1: elif self.quality <= 1:
if self.client.source == "tidal": if self.client.source == "tidal":
self.container = "AAC" self.container = "AAC"
audio = MP4(self.final_path) audio = MP4(self.path)
else: else:
self.container = "MP3" self.container = "MP3"
try: try:
audio = ID3(self.final_path) audio = ID3(self.path)
except ID3NoHeaderError: except ID3NoHeaderError:
audio = ID3() audio = ID3()
@ -439,7 +455,7 @@ class Track:
elif isinstance(audio, ID3): elif isinstance(audio, ID3):
if embed_cover: if embed_cover:
audio.add(cover) audio.add(cover)
audio.save(self.final_path, "v2_version=3") audio.save(self.path, "v2_version=3")
elif isinstance(audio, MP4): elif isinstance(audio, MP4):
audio["covr"] = [cover] audio["covr"] = [cover]
audio.save() audio.save()
@ -485,18 +501,27 @@ class Track:
if not hasattr(self, "final_path"): if not hasattr(self, "final_path"):
self.format_final_path() self.format_final_path()
if not os.path.isfile(self.final_path): if not os.path.isfile(self.path):
logger.debug(f"File {self.final_path} does not exist. Skipping conversion.") logger.info("File %s does not exist. Skipping conversion.", self.path)
click.secho(f"{self!s} does not exist. Skipping conversion.", fg="red") click.secho(f"{self!s} does not exist. Skipping conversion.", fg="red")
return return
engine = CONV_CLASS[codec.upper()]( assert (
filename=self.final_path, self.container in CONV_CLASS
), f"Invalid codec {codec}. Must be in {CONV_CLASS.keys()}"
engine = CONV_CLASS[self.container](
filename=self.path,
sampling_rate=kwargs.get("sampling_rate"), sampling_rate=kwargs.get("sampling_rate"),
remove_source=kwargs.get("remove_source", True), remove_source=kwargs.get("remove_source", True),
) )
click.secho(f"Converting {self!s}", fg="blue") click.secho(f"Converting {self!s}", fg="blue")
engine.convert() engine.convert()
self.path = engine.final_fn
self.final_path = self.final_path.replace(ext(self.quality, self.client.source), f".{engine.container}")
if not kwargs.get("stay_temp", False):
self.move(self.final_path)
@property @property
def title(self): def title(self):
@ -574,6 +599,7 @@ class Tracklist(list):
>>> tlist[2] >>> tlist[2]
IndexError IndexError
""" """
essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*") essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*")
def get(self, key: Union[str, int], default=None): def get(self, key: Union[str, int], default=None):
@ -816,7 +842,7 @@ class Album(Tracklist):
"version": resp.get("version"), "version": resp.get("version"),
"cover_urls": { "cover_urls": {
size: tidal_cover_url(resp.get("cover"), x) size: tidal_cover_url(resp.get("cover"), x)
for size, x in zip(COVER_SIZES, (160, 320, 1280)) for size, x in zip(COVER_SIZES, (160, 320, 640, 1280))
}, },
"streamable": resp.get("allowStreaming"), "streamable": resp.get("allowStreaming"),
"quality": TIDAL_Q_MAP[resp.get("audioQuality")], "quality": TIDAL_Q_MAP[resp.get("audioQuality")],
@ -844,7 +870,8 @@ class Album(Tracklist):
"cover_urls": { "cover_urls": {
sk: resp.get(rk) # size key, resp key sk: resp.get(rk) # size key, resp key
for sk, rk in zip( for sk, rk in zip(
COVER_SIZES, ("cover", "cover_medium", "cover_xl") COVER_SIZES,
("cover", "cover_medium", "cover_large", "cover_xl"),
) )
}, },
"url": resp.get("link"), "url": resp.get("link"),
@ -911,7 +938,7 @@ class Album(Tracklist):
): ):
"""Download all of the tracks in the album. """Download all of the tracks in the album.
:param quality: (5, 6, 7, 27) :param quality: (0, 1, 2, 3, 4)
:type quality: int :type quality: int
:param parent_folder: the folder to download the album to :param parent_folder: the folder to download the album to
:type parent_folder: Union[str, os.PathLike] :type parent_folder: Union[str, os.PathLike]
@ -927,40 +954,34 @@ class Album(Tracklist):
quality = min(quality, self.client.max_quality) quality = min(quality, self.client.max_quality)
folder = self._get_formatted_folder(parent_folder, quality) folder = self._get_formatted_folder(parent_folder, quality)
os.makedirs(folder, exist_ok=True)
logger.debug("Directory created: %s", folder)
# choose optimal cover size and download it # choose optimal cover size and download it
cover = None
cover_path = os.path.join(folder, "cover.jpg")
self.download_message() self.download_message()
click.secho("Downloading cover art", fg="magenta") click.secho("Downloading cover art", fg="magenta")
download_cover_size = kwargs.get("download_cover_size", "original") cover_path = os.path.join(gettempdir(), f"cover_{hash(self)}.jpg")
embed_cover_size = kwargs.get("embed_cover_size", "large") embed_cover_size = kwargs.get("embed_cover_size", "large")
if not os.path.isfile(cover_path):
if embed_cover_size not in self.cover_urls: assert (
embed_cover_size = "large" embed_cover_size in self.cover_urls
), f"Invalid cover size. Must be in {self.cover_urls.keys()}"
tqdm_download(self.cover_urls[embed_cover_size], cover_path) tqdm_download(self.cover_urls[embed_cover_size], cover_path)
if (
self.cover_urls.get(download_cover_size, embed_cover_size) if kwargs.get("keep_hires_cover", True):
!= embed_cover_size tqdm_download(self.cover_urls['original'], os.path.join(folder, 'cover.jpg'))
or os.path.getsize(cover_path) > FLAC_MAX_BLOCKSIZE
): cover_size = os.path.getsize(cover_path)
# download cover at another resolution but don't use for embed if cover_size > FLAC_MAX_BLOCKSIZE: # 16.77 MB
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg") click.secho(
shutil.move(cover_path, embed_cover_path) "Downgrading embedded cover size, too large ({cover_size}).",
tqdm_download(self.cover_urls[download_cover_size], cover_path) fg="bright_yellow",
else: )
embed_cover_path = cover_path # large is about 600x600px which is guaranteed < 16.7 MB
else: tqdm_download(self.cover_urls["large"], cover_path)
embed_cover_path = cover_path
embed_cover = kwargs.get("embed_cover", True) # embed by default embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover: if self.client.source != "deezer" and embed_cover:
cover = self.get_cover_obj(embed_cover_path, quality, self.client.source) cover = self.get_cover_obj(cover_path, quality, self.client.source)
download_args = { download_args = {
"quality": quality, "quality": quality,
@ -968,6 +989,7 @@ class Album(Tracklist):
"progress_bar": kwargs.get("progress_bar", True), "progress_bar": kwargs.get("progress_bar", True),
"database": database, "database": database,
"track_format": kwargs.get("track_format", TRACK_FORMAT), "track_format": kwargs.get("track_format", TRACK_FORMAT),
"stay_temp": kwargs.get("stay_temp")
} }
for track in self: for track in self:
logger.debug("Downloading track to %s", folder) logger.debug("Downloading track to %s", folder)
@ -975,23 +997,13 @@ class Album(Tracklist):
disc_folder = os.path.join(folder, f"Disc {track.meta.discnumber}") disc_folder = os.path.join(folder, f"Disc {track.meta.discnumber}")
download_args["parent_folder"] = disc_folder download_args["parent_folder"] = disc_folder
track.download(**download_args) track.download(quality=quality, parent_folder=folder, database=database, **kwargs)
# deezer tracks come tagged
if kwargs.get("tag_tracks", True) and self.client.source != "deezer": if kwargs.get("tag_tracks", True) and self.client.source != "deezer":
track.tag(cover=cover, embed_cover=embed_cover) track.tag(cover=cover, embed_cover=embed_cover)
if not kwargs.get("keep_embedded_cover", False):
try:
os.remove(embed_cover_path)
except NameError:
pass
# TODO: fix this, bad solution
if not kwargs.get("keep_downloaded_cover", True):
try:
os.remove(cover_path) os.remove(cover_path)
except NameError:
pass
self.downloaded = True self.downloaded = True
@ -1046,6 +1058,9 @@ class Album(Tracklist):
def __len__(self) -> int: def __len__(self) -> int:
return self.tracktotal return self.tracktotal
def __hash__(self):
return hash(self.id)
class Playlist(Tracklist): class Playlist(Tracklist):
"""Represents a downloadable playlist. """Represents a downloadable playlist.
@ -1219,7 +1234,7 @@ class Playlist(Tracklist):
track.meta["tracknumber"] = str(i + 1) track.meta["tracknumber"] = str(i + 1)
if ( if (
track.download(parent_folder=folder, quality=quality, database=database) track.download(parent_folder=folder, quality=quality, database=database, **kwargs)
and self.client.source != "deezer" and self.client.source != "deezer"
): ):
@ -1540,6 +1555,9 @@ class Artist(Tracklist):
""" """
return self.name return self.name
def __hash__(self) -> int:
return hash(self.id)
class Label(Artist): class Label(Artist):
def load_meta(self): def load_meta(self):

View file

@ -1,4 +1,4 @@
'''Manages the information that will be embeded in the audio file. ''' """Manages the information that will be embeded in the audio file. """
import json import json
import logging import logging