mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-17 00:24:50 -04:00
Support for Tidal qualities <= 1
This commit is contained in:
parent
0d2ca55be5
commit
f8dc9d206f
3 changed files with 88 additions and 41 deletions
|
@ -11,13 +11,13 @@ from typing import Any, Callable, Optional, Tuple, Union
|
||||||
import click
|
import click
|
||||||
from mutagen.flac import FLAC, Picture
|
from mutagen.flac import FLAC, Picture
|
||||||
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
|
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
|
||||||
|
from mutagen.mp4 import MP4, MP4Cover
|
||||||
from pathvalidate import sanitize_filename, sanitize_filepath
|
from pathvalidate import sanitize_filename, sanitize_filepath
|
||||||
|
|
||||||
from . import converter
|
from . import converter
|
||||||
from .clients import ClientInterface
|
from .clients import ClientInterface
|
||||||
from .constants import (
|
from .constants import (
|
||||||
ALBUM_KEYS,
|
ALBUM_KEYS,
|
||||||
EXT,
|
|
||||||
FLAC_MAX_BLOCKSIZE,
|
FLAC_MAX_BLOCKSIZE,
|
||||||
FOLDER_FORMAT,
|
FOLDER_FORMAT,
|
||||||
TRACK_FORMAT,
|
TRACK_FORMAT,
|
||||||
|
@ -37,6 +37,7 @@ from .utils import (
|
||||||
safe_get,
|
safe_get,
|
||||||
tidal_cover_url,
|
tidal_cover_url,
|
||||||
tqdm_download,
|
tqdm_download,
|
||||||
|
ext,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -315,7 +316,7 @@ class Track:
|
||||||
filename = clean_format(self.file_format, formatter)
|
filename = clean_format(self.file_format, formatter)
|
||||||
self.final_path = (
|
self.final_path = (
|
||||||
os.path.join(self.folder, filename)[:250].strip()
|
os.path.join(self.folder, filename)[:250].strip()
|
||||||
+ EXT[self.quality] # file extension dict
|
+ ext(self.quality, self.client.source)
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Formatted path: %s", self.final_path)
|
logger.debug("Formatted path: %s", self.final_path)
|
||||||
|
@ -366,7 +367,7 @@ class Track:
|
||||||
def tag(
|
def tag(
|
||||||
self,
|
self,
|
||||||
album_meta: dict = None,
|
album_meta: dict = None,
|
||||||
cover: Union[Picture, APIC] = None,
|
cover: Union[Picture, APIC, MP4Cover] = None,
|
||||||
embed_cover: bool = True,
|
embed_cover: bool = True,
|
||||||
):
|
):
|
||||||
"""Tag the track using the stored metadata.
|
"""Tag the track using the stored metadata.
|
||||||
|
@ -403,22 +404,28 @@ class Track:
|
||||||
logger.debug("Tagging file with %s container", self.container)
|
logger.debug("Tagging file with %s container", self.container)
|
||||||
audio = FLAC(self.final_path)
|
audio = FLAC(self.final_path)
|
||||||
elif self.quality <= 1:
|
elif self.quality <= 1:
|
||||||
self.container = "MP3"
|
if self.client.source == 'tidal':
|
||||||
logger.debug("Tagging file with %s container", self.container)
|
self.container = 'AAC'
|
||||||
|
audio = MP4(self.final_path)
|
||||||
|
else:
|
||||||
|
self.container = 'MP3'
|
||||||
try:
|
try:
|
||||||
audio = ID3(self.final_path)
|
audio = ID3(self.final_path)
|
||||||
except ID3NoHeaderError:
|
except ID3NoHeaderError:
|
||||||
audio = ID3()
|
audio = ID3()
|
||||||
|
|
||||||
|
logger.debug("Tagging file with %s container", self.container)
|
||||||
else:
|
else:
|
||||||
raise InvalidQuality(f'Invalid quality: "{self.quality}"')
|
raise InvalidQuality(f'Invalid quality: "{self.quality}"')
|
||||||
|
|
||||||
# automatically generate key, value pairs based on container
|
# automatically generate key, value pairs based on container
|
||||||
for k, v in self.meta.tags(self.container):
|
tags = self.meta.tags(self.container)
|
||||||
|
for k, v in tags:
|
||||||
audio[k] = v
|
audio[k] = v
|
||||||
|
|
||||||
if embed_cover and cover is None:
|
if embed_cover and cover is None:
|
||||||
assert hasattr(self, "cover_path")
|
assert hasattr(self, "cover_path")
|
||||||
cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
|
cover = Tracklist.get_cover_obj(self.cover_path, self.quality, self.client.source)
|
||||||
|
|
||||||
if isinstance(audio, FLAC):
|
if isinstance(audio, FLAC):
|
||||||
if embed_cover:
|
if embed_cover:
|
||||||
|
@ -428,6 +435,9 @@ class Track:
|
||||||
if embed_cover:
|
if embed_cover:
|
||||||
audio.add(cover)
|
audio.add(cover)
|
||||||
audio.save(self.final_path, "v2_version=3")
|
audio.save(self.final_path, "v2_version=3")
|
||||||
|
elif isinstance(audio, MP4):
|
||||||
|
audio['covr'] = [cover]
|
||||||
|
audio.save()
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown container type: {audio}")
|
raise ValueError(f"Unknown container type: {audio}")
|
||||||
|
|
||||||
|
@ -606,7 +616,7 @@ class Tracklist(list):
|
||||||
return cls(client=client, **info)
|
return cls(client=client, **info)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_cover_obj(cover_path: str, quality: int) -> Union[Picture, APIC]:
|
def get_cover_obj(cover_path: str, quality: int, source: str) -> Union[Picture, APIC]:
|
||||||
"""Given the path to an image and a quality id, return an initialized
|
"""Given the path to an image and a quality id, return an initialized
|
||||||
cover object that can be used for every track in the album.
|
cover object that can be used for every track in the album.
|
||||||
|
|
||||||
|
@ -616,18 +626,7 @@ class Tracklist(list):
|
||||||
:type quality: int
|
:type quality: int
|
||||||
:rtype: Union[Picture, APIC]
|
:rtype: Union[Picture, APIC]
|
||||||
"""
|
"""
|
||||||
cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture}
|
def flac_mp3_cover_obj(cover):
|
||||||
|
|
||||||
cover = cover_type.get(quality)
|
|
||||||
if cover is Picture:
|
|
||||||
size_ = os.path.getsize(cover_path)
|
|
||||||
if size_ > FLAC_MAX_BLOCKSIZE:
|
|
||||||
raise TooLargeCoverArt(
|
|
||||||
f"Not suitable for Picture embed: {size_ / 10 ** 6} MB"
|
|
||||||
)
|
|
||||||
elif cover is None:
|
|
||||||
raise InvalidQuality(f"Quality {quality} not allowed")
|
|
||||||
|
|
||||||
cover_obj = cover()
|
cover_obj = cover()
|
||||||
cover_obj.type = 3
|
cover_obj.type = 3
|
||||||
cover_obj.mime = "image/jpeg"
|
cover_obj.mime = "image/jpeg"
|
||||||
|
@ -636,6 +635,30 @@ class Tracklist(list):
|
||||||
|
|
||||||
return cover_obj
|
return cover_obj
|
||||||
|
|
||||||
|
if quality > 1:
|
||||||
|
cover = Picture
|
||||||
|
elif source == 'tidal':
|
||||||
|
cover = MP4Cover
|
||||||
|
else:
|
||||||
|
cover = APIC
|
||||||
|
|
||||||
|
if cover is Picture:
|
||||||
|
size_ = os.path.getsize(cover_path)
|
||||||
|
if size_ > FLAC_MAX_BLOCKSIZE:
|
||||||
|
raise TooLargeCoverArt(
|
||||||
|
f"Not suitable for Picture embed: {size_ / 10 ** 6} MB"
|
||||||
|
)
|
||||||
|
return flac_mp3_cover_obj(cover)
|
||||||
|
|
||||||
|
elif cover is APIC:
|
||||||
|
return flac_mp3_cover_obj(cover)
|
||||||
|
|
||||||
|
elif cover is MP4Cover:
|
||||||
|
with open(cover_path, 'rb') as img:
|
||||||
|
return cover(img.read(), imageformat=MP4Cover.FORMAT_JPEG)
|
||||||
|
|
||||||
|
raise InvalidQuality(f"Quality {quality} not allowed")
|
||||||
|
|
||||||
def download_message(self):
|
def download_message(self):
|
||||||
click.secho(
|
click.secho(
|
||||||
f"\nDownloading {self.title} ({self.__class__.__name__})\n",
|
f"\nDownloading {self.title} ({self.__class__.__name__})\n",
|
||||||
|
@ -913,7 +936,7 @@ class Album(Tracklist):
|
||||||
if (
|
if (
|
||||||
self.cover_urls.get(download_cover_size, embed_cover_size)
|
self.cover_urls.get(download_cover_size, embed_cover_size)
|
||||||
!= embed_cover_size
|
!= embed_cover_size
|
||||||
or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE
|
or os.path.getsize(cover_path) > FLAC_MAX_BLOCKSIZE
|
||||||
):
|
):
|
||||||
# download cover at another resolution but don't use for embed
|
# download cover at another resolution but don't use for embed
|
||||||
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg")
|
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg")
|
||||||
|
@ -921,10 +944,12 @@ class Album(Tracklist):
|
||||||
tqdm_download(self.cover_urls[download_cover_size], cover_path)
|
tqdm_download(self.cover_urls[download_cover_size], cover_path)
|
||||||
else:
|
else:
|
||||||
embed_cover_path = cover_path
|
embed_cover_path = cover_path
|
||||||
|
else:
|
||||||
|
embed_cover_path = cover_path
|
||||||
|
|
||||||
embed_cover = kwargs.get("embed_cover", True) # embed by default
|
embed_cover = kwargs.get("embed_cover", True) # embed by default
|
||||||
if self.client.source != "deezer" and embed_cover:
|
if self.client.source != "deezer" and embed_cover:
|
||||||
cover = self.get_cover_obj(embed_cover_path, quality)
|
cover = self.get_cover_obj(embed_cover_path, quality, self.client.source)
|
||||||
|
|
||||||
download_args = {
|
download_args = {
|
||||||
"quality": quality,
|
"quality": quality,
|
||||||
|
|
|
@ -93,9 +93,12 @@ class TrackMetadata:
|
||||||
"""
|
"""
|
||||||
if self.__source == "qobuz":
|
if self.__source == "qobuz":
|
||||||
self.album = resp.get("title")
|
self.album = resp.get("title")
|
||||||
self.tracktotal = str(resp.get("tracks_count", 1))
|
self.tracktotal = resp.get("tracks_count", 1)
|
||||||
self.genre = resp.get("genres_list", [])
|
self.genre = resp.get("genres_list", [])
|
||||||
self.date = resp.get("release_date_original") or resp.get("release_date")
|
self.date = resp.get("release_date_original") or resp.get("release_date")
|
||||||
|
if self.date:
|
||||||
|
self.year = self.date[:4]
|
||||||
|
|
||||||
self.copyright = resp.get("copyright")
|
self.copyright = resp.get("copyright")
|
||||||
self.albumartist = safe_get(resp, "artist", "name")
|
self.albumartist = safe_get(resp, "artist", "name")
|
||||||
self.label = resp.get("label")
|
self.label = resp.get("label")
|
||||||
|
@ -117,8 +120,11 @@ class TrackMetadata:
|
||||||
self.tracktotal = resp.get("numberOfTracks")
|
self.tracktotal = resp.get("numberOfTracks")
|
||||||
# genre not returned by API
|
# genre not returned by API
|
||||||
self.date = resp.get("releaseDate")
|
self.date = resp.get("releaseDate")
|
||||||
|
if self.date:
|
||||||
|
self.year = self.date[:4]
|
||||||
|
|
||||||
self.copyright = resp.get("copyright")
|
self.copyright = resp.get("copyright")
|
||||||
self.albumartist = resp.get("artist", {}).get("name")
|
self.albumartist = safe_get(resp, 'artist', 'name')
|
||||||
self.disctotal = resp.get("numberOfVolumes")
|
self.disctotal = resp.get("numberOfVolumes")
|
||||||
self.isrc = resp.get("isrc")
|
self.isrc = resp.get("isrc")
|
||||||
self.explicit = resp.get("explicit", False)
|
self.explicit = resp.get("explicit", False)
|
||||||
|
@ -127,9 +133,9 @@ class TrackMetadata:
|
||||||
elif self.__source == "deezer":
|
elif self.__source == "deezer":
|
||||||
self.album = resp.get("title")
|
self.album = resp.get("title")
|
||||||
self.tracktotal = resp.get("track_total")
|
self.tracktotal = resp.get("track_total")
|
||||||
self.genre = resp.get("genres", {}).get("data")
|
self.genre = safe_get(resp, 'genres', 'data')
|
||||||
self.date = resp.get("release_date")
|
self.date = resp.get("release_date")
|
||||||
self.albumartist = resp.get("artist", {}).get("name")
|
self.albumartist = safe_get(resp, 'artist', 'name')
|
||||||
self.label = resp.get("label")
|
self.label = resp.get("label")
|
||||||
# either 0 or 1
|
# either 0 or 1
|
||||||
self.explicit = bool(resp.get("parental_warning"))
|
self.explicit = bool(resp.get("parental_warning"))
|
||||||
|
@ -140,8 +146,8 @@ class TrackMetadata:
|
||||||
raise ValueError(self.__source)
|
raise ValueError(self.__source)
|
||||||
|
|
||||||
def add_track_meta(self, track: dict):
|
def add_track_meta(self, track: dict):
|
||||||
"""Parse the metadata from a track dict returned by the
|
"""Parse the metadata from a track dict returned by an
|
||||||
Qobuz API.
|
API.
|
||||||
|
|
||||||
:param track:
|
:param track:
|
||||||
"""
|
"""
|
||||||
|
@ -150,25 +156,23 @@ class TrackMetadata:
|
||||||
self._mod_title(track.get("version"), track.get("work"))
|
self._mod_title(track.get("version"), track.get("work"))
|
||||||
self.composer = track.get("composer", {}).get("name")
|
self.composer = track.get("composer", {}).get("name")
|
||||||
|
|
||||||
self.tracknumber = f"{int(track.get('track_number', 1)):02}"
|
self.tracknumber = track.get('track_number', 1)
|
||||||
self.discnumber = str(track.get("media_number", 1))
|
self.discnumber = track.get("media_number", 1)
|
||||||
try:
|
self.artist = safe_get(track, 'performer', 'name')
|
||||||
self.artist = track["performer"]["name"]
|
if self.artist is None:
|
||||||
except KeyError:
|
self.artist = self.get('albumartist')
|
||||||
if hasattr(self, "albumartist"):
|
|
||||||
self.artist = self.albumartist
|
|
||||||
|
|
||||||
elif self.__source == "tidal":
|
elif self.__source == "tidal":
|
||||||
self.title = track.get("title").strip()
|
self.title = track.get("title").strip()
|
||||||
self._mod_title(track.get("version"), None)
|
self._mod_title(track.get("version"), None)
|
||||||
self.tracknumber = f"{int(track.get('trackNumber', 1)):02}"
|
self.tracknumber = track.get('trackNumber', 1)
|
||||||
self.discnumber = str(track.get("volumeNumber"))
|
self.discnumber = track.get("volumeNumber")
|
||||||
self.artist = track.get("artist", {}).get("name")
|
self.artist = track.get("artist", {}).get("name")
|
||||||
|
|
||||||
elif self.__source == "deezer":
|
elif self.__source == "deezer":
|
||||||
self.title = track.get("title").strip()
|
self.title = track.get("title").strip()
|
||||||
self._mod_title(track.get("version"), None)
|
self._mod_title(track.get("version"), None)
|
||||||
self.tracknumber = f"{int(track.get('track_position', 1)):02}"
|
self.tracknumber = track.get('track_position', 1)
|
||||||
self.discnumber = track.get("disk_number")
|
self.discnumber = track.get("disk_number")
|
||||||
self.artist = track.get("artist", {}).get("name")
|
self.artist = track.get("artist", {}).get("name")
|
||||||
|
|
||||||
|
@ -364,14 +368,22 @@ class TrackMetadata:
|
||||||
if text is not None and v is not None:
|
if text is not None and v is not None:
|
||||||
yield (v.__name__, v(encoding=3, text=text))
|
yield (v.__name__, v(encoding=3, text=text))
|
||||||
|
|
||||||
def __mp4_tags(self) -> Tuple[str, str]:
|
def __gen_mp4_tags(self) -> Tuple[str, Union[str, int, tuple]]:
|
||||||
"""Generate key, value pairs to tag ALAC or AAC files in
|
"""Generate key, value pairs to tag ALAC or AAC files in
|
||||||
an MP4 container.
|
an MP4 container.
|
||||||
|
|
||||||
:rtype: Tuple[str, str]
|
:rtype: Tuple[str, str]
|
||||||
"""
|
"""
|
||||||
for k, v in MP4_KEY.items():
|
for k, v in MP4_KEY.items():
|
||||||
return (v, getattr(self, k))
|
if k == "tracknumber":
|
||||||
|
text = [(self.tracknumber, self.tracktotal)]
|
||||||
|
elif k == 'discnumber':
|
||||||
|
text = [(self.discnumber, self.get('disctotal', 1))]
|
||||||
|
else:
|
||||||
|
text = getattr(self, k)
|
||||||
|
|
||||||
|
if v is not None and text is not None:
|
||||||
|
yield (v, text)
|
||||||
|
|
||||||
def __setitem__(self, key, val):
|
def __setitem__(self, key, val):
|
||||||
"""Dict-like access for tags.
|
"""Dict-like access for tags.
|
||||||
|
|
|
@ -206,3 +206,13 @@ def decrypt_mqa_file(in_path, out_path, encryption_key):
|
||||||
dec_bytes = decryptor.decrypt(enc_file.read())
|
dec_bytes = decryptor.decrypt(enc_file.read())
|
||||||
with open(out_path, "wb") as dec_file:
|
with open(out_path, "wb") as dec_file:
|
||||||
dec_file.write(dec_bytes)
|
dec_file.write(dec_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def ext(quality: int, source: str):
|
||||||
|
if quality <= 1:
|
||||||
|
if source == 'tidal':
|
||||||
|
return '.m4a'
|
||||||
|
else:
|
||||||
|
return '.mp3'
|
||||||
|
else:
|
||||||
|
return '.flac'
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue