This commit is contained in:
nathom 2021-03-22 13:03:08 -07:00
parent adb4b2abb5
commit 41223c1237
25 changed files with 15 additions and 1895 deletions

0
music_dl/__init__.py Normal file
View file

107
music_dl/cli.py Normal file
View file

@ -0,0 +1,107 @@
# For tests
import logging
import os
import click
from qobuz_dl_rewrite.config import Config
from qobuz_dl_rewrite.constants import CACHE_DIR, CONFIG_DIR
from qobuz_dl_rewrite.core import MusicDL
from qobuz_dl_rewrite.utils import init_log
logger = logging.getLogger(__name__)
def _get_config(ctx):
if not os.path.isdir(CONFIG_DIR):
os.makedirs(CONFIG_DIR)
if not os.path.isdir(CACHE_DIR):
os.makedirs(CONFIG_DIR)
config = Config(ctx.obj.get("config"))
config.update_from_cli(**ctx.obj)
return config
# fmt: off
@click.group()
@click.option("--disable", metavar="PROVIDER,...", help="Disable following providers (comma separated)")
@click.option("-q", "--quality", metavar="INT", help="Quality integer ID (5, 6, 7, 27)")
@click.option("--embed-cover", is_flag=True, help="Embed cover art into files")
@click.option("--no-extras", is_flag=True, help="Ignore extras")
@click.option("--no-features", is_flag=True, help="Ignore features")
@click.option("--studio-albums", is_flag=True, help="Ignore non-studio albums")
@click.option("--remaster-only", is_flag=True, help="Ignore non-remastered albums")
@click.option("--albums-only", is_flag=True, help="Ignore non-album downloads")
@click.option("--large-cover", is_flag=True, help="Download large covers (it might fail with embed)")
@click.option("--remove-extra-tags", default=False, is_flag=True, help="Remove extra metadata from tags and files")
@click.option("--debug", default=False, is_flag=True, help="Enable debug logging")
@click.option("-f", "--folder", metavar="PATH", help="Custom download folder")
@click.option("--default-comment", metavar="COMMENT", help="Custom comment tag for audio files")
@click.option("-c", "--config", metavar="PATH", help="Custom config file")
@click.option("--db-file", metavar="PATH", help="Custom database file")
@click.option("--log-file", metavar="PATH", help="Custom logfile")
@click.option("--flush-cache", metavar="PATH", help="Flush the cache before running (only for extreme cases)")
# TODO: add options for conversion
@click.pass_context
# fmt: on
def cli(ctx, **kwargs):
ctx.ensure_object(dict)
for key in kwargs.keys():
ctx.obj[key] = kwargs.get(key)
if ctx.obj["debug"]:
init_log(path=ctx.obj.get("log_file"))
else:
click.secho("Debug is not enabled", fg="yellow")
@click.command(name="dl")
@click.argument("items", nargs=-1)
@click.pass_context
def download(ctx, items):
"""
Download an URL, space separated URLs or a text file with URLs.
Mixed arguments are also supported.
Examples:
* `qobuz-dl dl https://some.url/some_type/some_id`
* `qobuz-dl dl file_with_urls.txt`
* `qobuz-dl dl URL URL URL`
Supported sources and their types:
* Deezer (album, artist, track, playlist)
* Qobuz (album, artist, label, track, playlist)
* Tidal (album, artist, track, playlist)
"""
config = _get_config(ctx)
core = MusicDL(config)
for item in items:
try:
if os.path.isfile(item):
core.from_txt(item)
click.secho(f"File input found: {item}", fg="yellow")
else:
core.handle_url(item)
except Exception as error:
logger.error(error, exc_info=True)
click.secho(
f"{type(error).__name__} raised processing {item}: {error}", fg="red"
)
def main():
cli.add_command(download)
cli(obj={})
if __name__ == "__main__":
main()

503
music_dl/clients.py Normal file
View file

@ -0,0 +1,503 @@
import datetime
import hashlib
import logging
import os
import time
from abc import ABC, abstractmethod
from typing import Generator, Sequence, Tuple, Union
import requests
import tidalapi
from dogpile.cache import make_region
from .constants import (
AGENT,
CACHE_DIR,
DEEZER_MAX_Q,
DEEZER_Q_IDS,
QOBUZ_FEATURED_KEYS,
TIDAL_MAX_Q,
TIDAL_Q_IDS,
)
from .exceptions import (
AuthenticationError,
IneligibleError,
InvalidAppIdError,
InvalidAppSecretError,
InvalidQuality,
)
from .spoofbuz import Spoofer
os.makedirs(CACHE_DIR, exist_ok=True)
region = make_region().configure(
"dogpile.cache.dbm",
arguments={"filename": os.path.join(CACHE_DIR, "clients.db")},
)
logger = logging.getLogger(__name__)
TRACK_CACHE_TIME = datetime.timedelta(weeks=2).total_seconds()
RELEASE_CACHE_TIME = datetime.timedelta(days=1).total_seconds()
# Qobuz
QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2"
# Deezer
DEEZER_BASE = "https://api.deezer.com"
DEEZER_DL = "http://dz.loaderapp.info/deezer"
# ----------- Abstract Classes -----------------
class ClientInterface(ABC):
"""Common API for clients of all platforms.
This is an Abstract Base Class. It cannot be instantiated;
it is merely a template.
"""
@abstractmethod
def login(self, **kwargs):
"""Authenticate the client.
:param kwargs:
"""
pass
@abstractmethod
def search(self, query: str, media_type="album"):
"""Search API for query.
:param query:
:type query: str
:param type_:
"""
pass
@abstractmethod
def get(self, item_id, media_type="album"):
"""Get metadata.
:param meta_id:
:param type_:
"""
pass
@abstractmethod
def get_file_url(self, track_id, quality=6) -> Union[dict]:
"""Get the direct download url dict for a file.
:param track_id: id of the track
"""
pass
@property
@abstractmethod
def source(self):
pass
# ------------- Clients -----------------
class QobuzClient(ClientInterface):
# ------- Public Methods -------------
def __init__(self):
self.logged_in = False
def login(self, email: str, pwd: str, **kwargs):
"""Authenticate the QobuzClient. Must have a paid membership.
If `app_id` and `secrets` are not provided, this will run the
Spoofer script, which retrieves them. This will take some time,
so it is recommended to cache them somewhere for reuse.
:param email: email for the qobuz account
:type email: str
:param pwd: password for the qobuz account
:type pwd: str
:param kwargs: app_id: str, secrets: list, return_secrets: bool
"""
if self.logged_in:
logger.debug("Already logged in")
return
if (kwargs.get("app_id") or kwargs.get("secrets")) in (None, [], ""):
logger.info("Fetching tokens from Qobuz")
spoofer = Spoofer()
kwargs["app_id"] = spoofer.get_app_id()
kwargs["secrets"] = spoofer.get_secrets()
self.app_id = str(kwargs["app_id"]) # Ensure it is a string
self.secrets = kwargs["secrets"]
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": AGENT,
"X-App-Id": self.app_id,
}
)
self._api_login(email, pwd)
logger.debug("Logged into Qobuz")
self._validate_secrets()
logger.debug("Qobuz client is ready to use")
self.logged_in = True
def get_tokens(self) -> Tuple[str, Sequence[str]]:
return self.app_id, self.secrets
def search(
self, query: str, media_type: str = "album", limit: int = 500
) -> Generator:
"""Search the qobuz API.
If 'featured' is given as media type, this will retrieve results
from the featured albums in qobuz. The queries available with this type
are:
* most-streamed
* recent-releases
* best-sellers
* press-awards
* ideal-discography
* editor-picks
* most-featured
* qobuzissims
* new-releases
* new-releases-full
* harmonia-mundi
* universal-classic
* universal-jazz
* universal-jeunesse
* universal-chanson
:param query:
:type query: str
:param media_type:
:type media_type: str
:param limit:
:type limit: int
:rtype: Generator
"""
return self._api_search(query, media_type, limit)
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def get(self, item_id: Union[str, int], media_type: str = "album") -> dict:
return self._api_get(media_type, item_id=item_id)
def get_file_url(self, item_id, quality=6) -> dict:
return self._api_get_file_url(item_id, quality=quality)
@property
def source(self):
return "qobuz"
# ---------- Private Methods ---------------
# Credit to Sorrow446 for the original methods
def _gen_pages(self, epoint: str, params: dict) -> dict:
page, status_code = self._api_request(epoint, params)
logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys()))
key = epoint.split("/")[0] + "s"
total = page.get(key, {})
total = total.get("total") or total.get("items")
if not total:
logger.debug("Nothing found from %s epoint", epoint)
return
limit = page.get(key, {}).get("limit", 500)
offset = page.get(key, {}).get("offset", 0)
params.update({"limit": limit})
yield page
while (offset + limit) < total:
offset += limit
params.update({"offset": offset})
page, status_code = self._api_request(epoint, params)
yield page
def _validate_secrets(self):
for secret in self.secrets:
if self._test_secret(secret):
self.sec = secret
logger.debug("Working secret and app_id: %s - %s", secret, self.app_id)
break
if not hasattr(self, "sec"):
raise InvalidAppSecretError(f"Invalid secrets: {self.secrets}")
def _api_get(self, media_type: str, **kwargs) -> dict:
item_id = kwargs.get("item_id")
params = {
"app_id": self.app_id,
f"{media_type}_id": item_id,
"limit": kwargs.get("limit", 500),
"offset": kwargs.get("offset", 0),
}
extras = {
"artist": "albums",
"playlist": "tracks",
"label": "albums", # not tested
}
if media_type in extras:
params.update({"extra": extras[media_type]})
epoint = f"{media_type}/get"
response, status_code = self._api_request(epoint, params)
return response
def _api_search(self, query, media_type, limit=500) -> Generator:
params = {
"query": query,
"limit": limit,
}
# TODO: move featured, favorites, and playlists into _api_get later
if media_type == "featured":
assert query in QOBUZ_FEATURED_KEYS, f'query "{query}" is invalid.'
params.update({"type": query})
del params["query"]
epoint = "album/getFeatured"
elif query == "user-favorites":
assert query in ("track", "artist", "album")
params.update({"type": f"{media_type}s"})
epoint = "favorite/getUserFavorites"
elif query == "user-playlists":
epoint = "playlist/getUserPlaylists"
else:
epoint = f"{media_type}/search"
return self._gen_pages(epoint, params)
def _api_login(self, email: str, pwd: str):
# usr_info = self._api_call("user/login", email=email, pwd=pwd)
params = {
"email": email,
"password": pwd,
"app_id": self.app_id,
}
epoint = "user/login"
resp, status_code = self._api_request(epoint, params)
if status_code == 401:
raise AuthenticationError(f"Invalid credentials from params {params}")
elif status_code == 400:
raise InvalidAppIdError(f"Invalid app id from params {params}")
else:
logger.info("Logged in to Qobuz")
if not resp["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.")
self.uat = resp["user_auth_token"]
self.session.headers.update({"X-User-Auth-Token": self.uat})
self.label = resp["user"]["credential"]["parameters"]["short_label"]
def _api_get_file_url(
self, track_id: Union[str, int], quality: int = 6, sec: str = None
) -> dict:
unix_ts = time.time()
if int(quality) not in (5, 6, 7, 27): # Needed?
raise InvalidQuality(f"Invalid quality id {quality}. Choose 5, 6, 7 or 27")
if sec is not None:
secret = sec
elif hasattr(self, "sec"):
secret = self.sec
else:
raise InvalidAppSecretError("Cannot find app secret")
r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}"
logger.debug("Raw request signature: %s", r_sig)
r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
logger.debug("Hashed request signature: %s", r_sig_hashed)
params = {
"request_ts": unix_ts,
"request_sig": r_sig_hashed,
"track_id": track_id,
"format_id": quality,
"intent": "stream",
}
response, status_code = self._api_request("track/getFileUrl", params)
if status_code == 400:
raise InvalidAppSecretError("Invalid app secret from params %s" % params)
return response
def _api_request(self, epoint: str, params: dict) -> Tuple[dict, int]:
logging.debug(f"Calling API with endpoint {epoint} params {params}")
r = self.session.get(f"{QOBUZ_BASE}/{epoint}", params=params)
try:
return r.json(), r.status_code
except Exception:
logger.error("Problem getting JSON. Status code: %s", r.status_code)
raise
def _test_secret(self, secret: str) -> bool:
try:
self._api_get_file_url("19512574", sec=secret)
return True
except InvalidAppSecretError as error:
logger.debug("Test for %s secret didn't work: %s", secret, error)
return False
class DeezerClient(ClientInterface):
def __init__(self):
self.session = requests.Session()
self.logged_in = True
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict:
"""Search API for query.
:param query:
:type query: str
:param media_type:
:type media_type: str
:param limit:
:type limit: int
"""
# TODO: more robust url sanitize
query = query.replace(" ", "+")
if media_type.endswith("s"):
media_type = media_type[:-1]
# TODO: use limit parameter
response = self.session.get(f"{DEEZER_BASE}/search/{media_type}?q={query}")
response.raise_for_status()
return response.json()
def login(self, **kwargs):
logger.debug("Deezer does not require login call, returning")
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def get(self, meta_id: Union[str, int], media_type: str = "album"):
"""Get metadata.
:param meta_id:
:type meta_id: Union[str, int]
:param type_:
:type type_: str
"""
url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
item = self.session.get(url).json()
if media_type in ("album", "playlist"):
tracks = self.session.get(f"{url}/tracks").json()
item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"])
elif media_type == "artist":
albums = self.session.get(f"{url}/albums").json()
item["albums"] = albums["data"]
return item
@staticmethod
def get_file_url(meta_id: Union[str, int], quality: int = 6):
quality = min(DEEZER_MAX_Q, quality)
url = f"{DEEZER_DL}/{DEEZER_Q_IDS[quality]}/{DEEZER_BASE}/track/{meta_id}"
logger.debug(f"Download url {url}")
return url
@property
def source(self):
return "deezer"
class TidalClient(ClientInterface):
def __init__(self):
self.logged_in = False
def login(self, email: str, pwd: str):
if self.logged_in:
return
config = tidalapi.Config()
self.session = tidalapi.Session(config=config)
self.session.login(email, pwd)
logger.info("Logged into Tidal")
self.logged_in = True
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def search(self, query: str, media_type: str = "album", limit: int = 50):
"""
:param query:
:type query: str
:param media_type: artist, album, playlist, or track
:type media_type: str
:param limit:
:type limit: int
:raises ValueError: if field value is invalid
"""
return self._search(query, media_type, limit=limit)
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def get(self, meta_id: Union[str, int], media_type: str = "album"):
"""Get metadata.
:param meta_id:
:type meta_id: Union[str, int]
:param media_type:
:type media_type: str
"""
return self._get(meta_id, media_type)
def get_file_url(self, meta_id: Union[str, int], quality: int = 6):
"""
:param meta_id:
:type meta_id: Union[str, int]
:param quality:
:type quality: int
"""
logger.debug(f"Fetching file url with quality {quality}")
return self._get_file_url(meta_id, quality=min(TIDAL_MAX_Q, quality))
@property
def source(self):
return "tidal"
def _search(self, query, media_type="album", **kwargs):
params = {
"query": query,
"limit": kwargs.get("limit", 50),
}
return self.session.request("GET", f"search/{media_type}s", params).json()
def _get(self, media_id, media_type="album"):
if media_type == "album":
info = self.session.request("GET", f"albums/{media_id}")
tracklist = self.session.request("GET", f"albums/{media_id}/tracks")
album = info.json()
album["tracks"] = tracklist.json()
return album
elif media_type == "track":
return self.session.request("GET", f"tracks/{media_id}").json()
elif media_type == "playlist":
return self.session.request("GET", f"playlists/{media_id}/tracks").json()
elif media_type == "artist":
return self.session.request("GET", f"artists/{media_id}/albums").json()
else:
raise ValueError
def _get_file_url(self, track_id, quality=6):
params = {"soundQuality": TIDAL_Q_IDS[quality]}
resp = self.session.request("GET", f"tracks/{track_id}/streamUrl", params)
resp.raise_for_status()
return resp.json()

159
music_dl/config.py Normal file
View file

@ -0,0 +1,159 @@
import logging
import os
from pprint import pformat
from ruamel.yaml import YAML
from .constants import CONFIG_PATH, FOLDER_FORMAT, TRACK_FORMAT
from .exceptions import InvalidSourceError
yaml = YAML()
logger = logging.getLogger(__name__)
class Config:
"""Config class that handles command line args and config files.
Usage:
>>> config = Config('test_config.yaml')
If test_config was already initialized with values, this will load them
into `config`. Otherwise, a new config file is created with the default
values.
>>> config.update_from_cli(**args)
This will update the config values based on command line args.
"""
def __init__(self, path: str):
# DEFAULTS
folder = "Downloads"
quality = 6
folder_format = FOLDER_FORMAT
track_format = TRACK_FORMAT
self.qobuz = {
"enabled": True,
"email": None,
"password": None,
"app_id": "", # Avoid NoneType error
"secrets": [],
}
self.tidal = {"enabled": True, "email": None, "password": None}
self.deezer = {"enabled": True}
self.downloads_database = None
self.filters = {
"no_extras": False,
"albums_only": False,
"no_features": False,
"studio_albums": False,
"remaster_only": False,
}
self.downloads = {"folder": folder, "quality": quality}
self.metadata = {
"embed_cover": False,
"large_cover": False,
"default_comment": None,
"remove_extra_tags": False,
}
self.path_format = {"folder": folder_format, "track": track_format}
if path is None:
self._path = CONFIG_PATH
else:
self._path = path
if not os.path.exists(self._path):
logger.debug(f"Creating yaml config file at {self._path}")
self.dump(self.info)
else:
# sometimes the file gets erased, this will reset it
with open(self._path) as f:
if f.read().strip() == "":
logger.debug(f"Config file {self._path} corrupted, resetting.")
self.dump(self.info)
else:
self.load()
def save(self):
self.dump(self.info)
def reset(self):
os.remove(self._path)
# re initialize with default info
self.__init__(self._path)
def load(self):
with open(self._path) as cfg:
for k, v in yaml.load(cfg).items():
setattr(self, k, v)
logger.debug("Config loaded")
self.__loaded = True
def update_from_cli(self, **kwargs):
for category in (self.downloads, self.metadata, self.filters):
for key in category.keys():
if kwargs[key] is None:
continue
# For debugging's sake
og_value = category[key]
new_value = kwargs[key] or og_value
category[key] = new_value
if og_value != new_value:
logger.debug("Updated %s config key from args: %s", key, new_value)
def dump(self, info):
with open(self._path, "w") as cfg:
logger.debug("Config saved: %s", self._path)
yaml.dump(info, cfg)
@property
def tidal_creds(self):
return {
"email": self.tidal["email"],
"pwd": self.tidal["password"],
}
@property
def qobuz_creds(self):
return {
"email": self.qobuz["email"],
"pwd": self.qobuz["password"],
"app_id": self.qobuz["app_id"],
"secrets": self.qobuz["secrets"],
}
def creds(self, source: str):
if source == "qobuz":
return self.qobuz_creds
elif source == "tidal":
return self.tidal_creds
elif source == "deezer":
return dict()
else:
raise InvalidSourceError(source)
@property
def info(self):
return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
@info.setter
def info(self, val):
for k, v in val.items():
setattr(self, k, v)
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, val):
setattr(self, key, val)
def __repr__(self):
return f"Config({pformat(self.info)})"

146
music_dl/constants.py Normal file
View file

@ -0,0 +1,146 @@
import os
import appdirs
import mutagen.id3 as id3
APPNAME = "qobuz-dl"
CACHE_DIR = appdirs.user_cache_dir(APPNAME)
CONFIG_DIR = appdirs.user_config_dir(APPNAME)
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml")
LOG_DIR = appdirs.user_config_dir(APPNAME)
DB_PATH = os.path.join(LOG_DIR, "qobuz-dl.db")
AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0"
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
EXT = {
5: ".mp3",
6: ".flac",
7: ".flac",
27: ".flac",
}
QUALITY_DESC = {
4: "128kbps",
5: "320kbps",
6: "16bit/44.1kHz",
7: "24bit/96kHz",
27: "24bit/192kHz",
}
QOBUZ_FEATURED_KEYS = (
"most-streamed",
"recent-releases",
"best-sellers",
"press-awards",
"ideal-discography",
"editor-picks",
"most-featured",
"qobuzissims",
"new-releases",
"new-releases-full",
"harmonia-mundi",
"universal-classic",
"universal-jazz",
"universal-jeunesse",
"universal-chanson",
)
__MP4_KEYS = (
"\xa9nam",
"\xa9ART",
"\xa9alb",
r"aART",
"\xa9day",
"\xa9day",
"\xa9cmt",
"desc",
"purd",
"\xa9grp",
"\xa9gen",
"\xa9lyr",
"\xa9too",
"cprt",
"cpil",
"covr",
"trkn",
"disk",
)
__MP3_KEYS = (
id3.TIT2,
id3.TPE1,
id3.TALB,
id3.TPE2,
id3.TCOM,
id3.TYER,
id3.COMM,
id3.TT1,
id3.TT1,
id3.GP1,
id3.TCON,
id3.USLT,
id3.TEN,
id3.TCOP,
id3.TCMP,
None,
id3.TRCK,
id3.TPOS,
)
__METADATA_TYPES = (
"title",
"artist",
"album",
"albumartist",
"composer",
"year",
"comment",
"description",
"purchase_date",
"grouping",
"genre",
"lyrics",
"encoder",
"copyright",
"compilation",
"cover",
"tracknumber",
"discnumber",
)
FLAC_KEY = {v: v.upper() for v in __METADATA_TYPES}
MP4_KEY = dict(zip(__METADATA_TYPES, __MP4_KEYS))
MP3_KEY = dict(zip(__METADATA_TYPES, __MP3_KEYS))
COPYRIGHT = "\u2117"
PHON_COPYRIGHT = "\u00a9"
FLAC_MAX_BLOCKSIZE = 16777215 # 16.7 MB
TRACK_KEYS = ("tracknumber", "artist", "albumartist", "composer", "title")
ALBUM_KEYS = ("albumartist", "title", "year", "bit_depth", "sampling_rate", "container")
FOLDER_FORMAT = (
"{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
)
TRACK_FORMAT = "{tracknumber}. {artist} - {title}"
URL_REGEX = (
r"https:\/\/(?:www|open|play)?\.?(\w+)\.com(?:(?:\/(track|playlist|album|"
r"artist|label))|(?:\/[-\w]+?))+\/(\w+)"
)
TIDAL_Q_IDS = {
4: "LOW", # AAC
5: "HIGH", # AAC
6: "LOSSLESS", # Lossless, but it also could be MQA
7: "HI_RES", # not available for download
}
TIDAL_MAX_Q = 7
DEEZER_Q_IDS = {4: 128, 5: 320, 6: 1411}
DEEZER_MAX_Q = 6

213
music_dl/converter.py Normal file
View file

@ -0,0 +1,213 @@
import logging
import os
import shutil
import subprocess
from tempfile import gettempdir
from typing import Optional
from .exceptions import ConversionError
logger = logging.getLogger(__name__)
class Converter:
"""Base class for audio codecs."""
codec_name = None
codec_lib = None
container = None
lossless = False
default_ffmpeg_arg = ""
def __init__(
self,
filename: str,
ffmpeg_arg: Optional[str] = None,
sampling_rate: Optional[int] = None,
bit_depth: Optional[int] = None,
copy_art: bool = True,
remove_source: bool = False,
show_progress: bool = False,
):
"""
:param filename:
:type filename: str
:param ffmpeg_arg: The codec ffmpeg argument (defaults to an "optimal value")
:type ffmpeg_arg: Optional[str]
:param sampling_rate: This value is ignored if a lossy codec is detected
:type sampling_rate: Optional[int]
:param bit_depth: This value is ignored if a lossy codec is detected
:type bit_depth: Optional[int]
:param copy_art: Embed the cover art (if found) into the encoded file
:type copy_art: bool
:param remove_source:
:type remove_source: bool
"""
logger.debug(locals())
self.filename = filename
self.final_fn = f"{os.path.splitext(filename)[0]}.{self.container}"
self.tempfile = os.path.join(gettempdir(), os.path.basename(self.final_fn))
self.remove_source = remove_source
self.sampling_rate = sampling_rate
self.bit_depth = bit_depth
self.copy_art = copy_art
self.show_progress = show_progress
if ffmpeg_arg is None:
logger.debug("No arguments provided. Codec defaults will be used")
self.ffmpeg_arg = self.default_ffmpeg_arg
else:
self.ffmpeg_arg = ffmpeg_arg
self._is_command_valid()
logger.debug("FFmpeg codec extra argument: %s", self.ffmpeg_arg)
def convert(self, custom_fn: Optional[str] = None):
"""Convert the file.
:param custom_fn: Custom output filename (defaults to the original
name with a replaced container)
:type custom_fn: Optional[str]
"""
if custom_fn:
self.final_fn = custom_fn
self.command = self._gen_command()
logger.debug("Generated conversion command: %s", self.command)
process = subprocess.Popen(self.command)
process.wait()
if os.path.isfile(self.tempfile):
if self.remove_source:
os.remove(self.filename)
logger.debug("Source removed: %s", self.filename)
shutil.move(self.tempfile, self.final_fn)
logger.debug("Moved: %s -> %s", self.tempfile, self.final_fn)
logger.debug("Converted: %s -> %s", self.filename, self.final_fn)
else:
raise ConversionError("No file was returned from conversion")
def _gen_command(self):
command = [
"ffmpeg",
"-i",
self.filename,
"-loglevel",
"warning",
"-c:a",
self.codec_lib,
]
if self.show_progress:
command.append("-stats")
if self.copy_art:
command.extend(["-c:v", "copy"])
if self.ffmpeg_arg:
command.extend(self.ffmpeg_arg.split())
if self.lossless:
if isinstance(self.sampling_rate, int):
command.extend(["-ar", str(self.sampling_rate)])
elif self.sampling_rate is not None:
raise TypeError(
f"Sampling rate must be int, not {type(self.sampling_rate)}"
)
if isinstance(self.bit_depth, int):
if int(self.bit_depth) == 16:
command.extend(["-sample_fmt", "s16"])
elif int(self.bit_depth) in (24, 32):
command.extend(["-sample_fmt", "s32"])
else:
raise ValueError("Bit depth must be 16, 24, or 32")
elif self.bit_depth is not None:
raise TypeError(f"Bit depth must be int, not {type(self.bit_depth)}")
command.extend(["-y", self.tempfile])
return command
def _is_command_valid(self):
# TODO: add error handling for lossy codecs
if self.ffmpeg_arg is not None and self.lossless:
logger.debug(
"Lossless codecs don't support extra arguments; "
"the extra argument will be ignored"
)
self.ffmpeg_arg = self.default_ffmpeg_arg
return
class FLAC(Converter):
" Class for FLAC converter. "
codec_name = "flac"
codec_lib = "flac"
container = "flac"
lossless = True
class LAME(Converter):
"""
Class for libmp3lame converter. Defaul ffmpeg_arg: `-q:a 0`.
See available options:
https://trac.ffmpeg.org/wiki/Encode/MP3
"""
codec_name = "lame"
codec_lib = "libmp3lame"
container = "mp3"
default_ffmpeg_arg = "-q:a 0" # V0
class ALAC(Converter):
" Class for ALAC converter. "
codec_name = "alac"
codec_lib = "alac"
container = "m4a"
lossless = True
class Vorbis(Converter):
"""
Class for libvorbis converter. Default ffmpeg_arg: `-q:a 6`.
See available options:
https://trac.ffmpeg.org/wiki/TheoraVorbisEncodingGuide
"""
codec_name = "vorbis"
codec_lib = "libvorbis"
container = "ogg"
default_ffmpeg_arg = "-q:a 6" # 160, aka the "high" quality profile from Spotify
class OPUS(Converter):
"""
Class for libopus. Default ffmpeg_arg: `-b:a 128 -vbr on`.
See more:
http://ffmpeg.org/ffmpeg-codecs.html#libopus-1
"""
codec_name = "opus"
codec_lib = "libopus"
container = "opus"
default_ffmpeg_arg = "-b:a 128k" # Transparent
class AAC(Converter):
"""
Class for libfdk_aac converter. Default ffmpeg_arg: `-b:a 256k`.
See available options:
https://trac.ffmpeg.org/wiki/Encode/AAC
"""
codec_name = "aac"
codec_lib = "libfdk_aac"
container = "m4a"
default_ffmpeg_arg = "-b:a 256k"

184
music_dl/core.py Normal file
View file

@ -0,0 +1,184 @@
import logging
import os
import re
from getpass import getpass
from typing import Generator, Optional, Tuple, Union
import click
from .clients import DeezerClient, QobuzClient, TidalClient
from .config import Config
from .constants import CONFIG_PATH, DB_PATH, URL_REGEX
from .db import QobuzDB
from .downloader import Album, Artist, Playlist, Track, Label
from .exceptions import AuthenticationError, ParsingError
from .utils import capitalize
logger = logging.getLogger(__name__)
MEDIA_CLASS = {"album": Album, "playlist": Playlist, "artist": Artist, "track": Track, "label": Label}
CLIENTS = {"qobuz": QobuzClient, "tidal": TidalClient, "deezer": DeezerClient}
Media = Union[Album, Playlist, Artist, Track] # type hint
# TODO: add support for database
class MusicDL:
def __init__(
self,
config: Optional[Config] = None,
database: Optional[str] = None,
):
logger.debug(locals())
self.url_parse = re.compile(URL_REGEX)
self.config = config
if self.config is None:
self.config = Config(CONFIG_PATH)
self.clients = {
"qobuz": QobuzClient(),
"tidal": TidalClient(),
"deezer": DeezerClient(),
}
if database is None:
self.db = QobuzDB(DB_PATH)
else:
assert isinstance(database, QobuzDB)
self.db = database
def prompt_creds(self, source: str):
"""Prompt the user for credentials.
:param source:
:type source: str
"""
click.secho(f"Enter {capitalize(source)} email:", fg="green")
self.config[source]["email"] = input()
click.secho(
f"Enter {capitalize(source)} password (will not show on screen):",
fg="green",
)
self.config[source]["password"] = getpass(
prompt=""
) # does hashing work for tidal?
self.config.save()
click.secho(f'Credentials saved to config file at "{self.config._path}"')
def assert_creds(self, source: str):
assert source in ("qobuz", "tidal", "deezer"), f"Invalid source {source}"
if source == "deezer":
# no login for deezer
return
if (
self.config[source]["email"] is None
or self.config[source]["password"] is None
):
self.prompt_creds(source)
def handle_url(self, url: str):
"""Download an url
:param url:
:type url: str
:raises InvalidSourceError
:raises ParsingError
"""
source, url_type, item_id = self.parse_url(url)
if item_id in self.db:
logger.info(f"{url} already downloaded, use --no-db to override.")
return
self.handle_item(source, url_type, item_id)
def handle_item(self, source: str, media_type: str, item_id: str):
self.assert_creds(source)
arguments = {
"database": self.db,
"parent_folder": self.config.downloads["folder"],
"quality": self.config.downloads["quality"],
"embed_cover": self.config.metadata["embed_cover"],
}
client = self.clients[source]
if not client.logged_in:
while True:
try:
client.login(**self.config.creds(source))
break
except AuthenticationError:
click.secho("Invalid credentials, try again.")
self.prompt_creds(source)
item = MEDIA_CLASS[media_type](client=client, id=item_id)
if isinstance(item, Artist):
keys = self.config.filters.keys()
# TODO: move this to config.py
filters_ = tuple(key for key in keys if self.config.filters[key])
arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_)
logger.debug("Arguments from config: %s", arguments)
item.load_meta()
item.download(**arguments)
def parse_url(self, url: str) -> Tuple[str, str]:
"""Returns the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/{type}/{name}/{id}
https://open.qobuz.com/{type}/{id}
https://play.qobuz.com/{type}/{id}
/us-en/{type}/-/{id}
https://www.deezer.com/us/{type}/{id}
https://tidal.com/browse/{type}/{id}
:raises exceptions.ParsingError
"""
parsed = self.url_parse.search(url)
if parsed is not None:
parsed = parsed.groups()
if len(parsed) == 3:
return tuple(parsed) # Convert from Seq for the sake of typing
raise ParsingError(f"Error parsing URL: `{url}`")
def from_txt(self, filepath: Union[str, os.PathLike]):
"""
Handle a text file containing URLs. Lines starting with `#` are ignored.
:param filepath:
:type filepath: Union[str, os.PathLike]
:raises OSError
:raises exceptions.ParsingError
"""
with open(filepath) as txt:
lines = (
line for line in txt.readlines() if not line.strip().startswith("#")
)
click.secho(f"URLs found in text file: {len(lines)}")
for line in lines:
self.handle_url(line)
def search(
self, query: str, media_type: str = "album", limit: int = 200
) -> Generator:
results = self.client.search(query, media_type, limit)
if isinstance(results, Generator): # QobuzClient
for page in results:
for item in page[f"{media_type}s"]["items"]:
yield MEDIA_CLASS[media_type].from_api(item, self.client)
else:
for item in results.get("data") or results.get("items"):
yield MEDIA_CLASS[media_type].from_api(item, self.client)

62
music_dl/db.py Normal file
View file

@ -0,0 +1,62 @@
import logging
import os
import sqlite3
from typing import Union
logger = logging.getLogger(__name__)
class MusicDB:
"""Simple interface for the downloaded track database."""
def __init__(self, db_path: Union[str, os.PathLike]):
"""Create a QobuzDB object
:param db_path: filepath of the database
:type db_path: Union[str, os.PathLike]
"""
self.path = db_path
if not os.path.exists(self.path):
self.create()
def create(self):
"""Create a database at `self.path`"""
with sqlite3.connect(self.path) as conn:
try:
conn.execute("CREATE TABLE downloads (id TEXT UNIQUE NOT NULL);")
logger.debug("Download-IDs database created: %s", self.path)
except sqlite3.OperationalError:
pass
return self.path
def __contains__(self, item_id: Union[str, int]) -> bool:
"""Checks whether the database contains an id.
:param item_id: the id to check
:type item_id: str
:rtype: bool
"""
with sqlite3.connect(self.path) as conn:
return (
conn.execute(
"SELECT id FROM downloads where id=?", (item_id,)
).fetchone()
is not None
)
def add(self, item_id: str):
"""Adds an id to the database.
:param item_id:
:type item_id: str
"""
with sqlite3.connect(self.path) as conn:
try:
conn.execute(
"INSERT INTO downloads (id) VALUES (?)",
(item_id,),
)
conn.commit()
except sqlite3.Error as error:
logger.error("Unexpected DB error: %s", error)

1270
music_dl/downloader.py Normal file

File diff suppressed because it is too large Load diff

46
music_dl/exceptions.py Normal file
View file

@ -0,0 +1,46 @@
class AuthenticationError(Exception):
pass
class IneligibleError(Exception):
pass
class InvalidAppIdError(Exception):
pass
class InvalidAppSecretError(Exception):
pass
class InvalidQuality(Exception):
pass
class NonStreamable(Exception):
pass
class InvalidContainerError(Exception):
pass
class InvalidSourceError(Exception):
pass
class ParsingError(Exception):
pass
class TooLargeCoverArt(Exception):
pass
class BadEncoderOption(Exception):
pass
class ConversionError(Exception):
pass

391
music_dl/metadata.py Normal file
View file

@ -0,0 +1,391 @@
import json
import logging
import re
import sys
from typing import Generator, Optional, Tuple, Union
from .constants import (
COPYRIGHT,
FLAC_KEY,
MP3_KEY,
MP4_KEY,
PHON_COPYRIGHT,
TRACK_KEYS,
)
from .exceptions import InvalidContainerError
logger = logging.getLogger(__name__)
class TrackMetadata:
"""Contains all of the metadata needed to tag the file.
Available attributes:
* title
* artist
* album
* albumartist
* composer
* year
* comment
* description
* purchase_date
* grouping
* genre
* lyrics
* encoder
* copyright
* compilation
* cover
* tracknumber
* discnumber
"""
def __init__(
self, track: Optional[dict] = None, album: Optional[dict] = None, source="qobuz"
):
"""Creates a TrackMetadata object optionally initialized with
dicts returned by the Qobuz API.
:param track: track dict from API
:type track: Optional[dict]
:param album: album dict from API
:type album: Optional[dict]
"""
self.album = None
self.albumartist = None
self.composer = None
self.comment = None
self.description = None
self.purchase_date = None
self.grouping = None
self.lyrics = None
self.encoder = None
self.compilation = None
self.cover = None
self.tracknumber = None
self.discnumber = None
self.__source = source # not included in tags
if track is None and album is None:
logger.debug("No params passed, returning")
return
if track is not None:
self.add_track_meta(track)
if album is not None:
self.add_album_meta(album)
def add_album_meta(self, resp: dict):
"""Parse the metadata from an resp dict returned by the
Qobuz API.
:param dict resp: from the Qobuz API
"""
if self.__source == "qobuz":
self.album = resp.get("title")
self.tracktotal = str(resp.get("tracks_count", 1))
self.genre = resp.get("genres_list", [])
self.date = resp.get("release_date_original") or resp.get("release_date")
self.copyright = resp.get("copyright")
self.albumartist = resp.get("artist", {}).get("name")
self.label = resp.get("label")
if isinstance(self.label, dict):
self.label = self.label.get("name")
elif self.__source == "tidal":
self.album = resp.get("title")
self.tracktotal = resp.get("numberOfTracks")
# genre not returned by API
self.date = resp.get("releaseDate")
self.copyright = resp.get("copyright")
self.albumartist = resp.get("artist", {}).get("name")
# label not returned by API
elif self.__source == "deezer":
self.album = resp.get("title")
self.tracktotal = resp.get("track_total")
self.genre = resp.get("genres", {}).get("data")
self.date = resp.get("release_date")
self.albumartist = resp.get("artist", {}).get("name")
self.label = resp.get("label")
else:
raise ValueError
def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by the
Qobuz API.
:param track:
"""
if self.__source == "qobuz":
self.title = track.get("title").strip()
self._mod_title(track.get("version"), track.get("work"))
self.composer = track.get("composer", {}).get("name")
self.tracknumber = f"{int(track.get('track_number', 1)):02}"
self.discnumber = str(track.get("media_number", 1))
try:
self.artist = track["performer"]["name"]
except KeyError:
if hasattr(self, "albumartist"):
self.artist = self.albumartist
elif self.__source == "tidal":
self.title = track.get("title").strip()
self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('trackNumber', 1)):02}"
self.discnumber = str(track.get("volumeNumber"))
self.artist = track.get("artist", {}).get("name")
elif self.__source == "deezer":
self.title = track.get("title").strip()
self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('track_position', 1)):02}"
self.discnumber = track.get("disk_number")
self.artist = track.get("artist", {}).get("name")
else:
raise ValueError
if track.get("album"):
self.add_album_meta(track["album"])
def _mod_title(self, version, work):
if version is not None:
logger.debug("Version found: %s", version)
self.title = f"{self.title} ({version})"
if work is not None:
logger.debug("Work found: %s", work)
self.title = f"{work}: {self.title}"
@property
def artist(self) -> Union[str, None]:
"""Returns the value to set for the artist tag. Defaults to
`self.albumartist` if there is no track artist.
:rtype: str
"""
if self._artist is None and self.albumartist is not None:
return self.albumartist
if self._artist is not None:
return self._artist
@artist.setter
def artist(self, val: str):
"""Sets the internal artist variable to val.
:param val:
:type val: str
"""
self._artist = val
@property
def genre(self) -> Union[str, None]:
"""Formats the genre list returned by the Qobuz API.
>>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé']
>>> _format_genres(g)
'Pop, Rock, Alternatif et Indé'
:rtype: str
"""
if not self.get("_genres"):
return None
if isinstance(self._genres, list):
genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres))
no_repeats = []
[no_repeats.append(g) for g in genres if g not in no_repeats]
return ", ".join(no_repeats)
elif isinstance(self._genres, str):
return self._genres
raise TypeError(f"Genre must be list or str, not {type(self._genres)}")
@genre.setter
def genre(self, val: Union[str, list]):
"""Sets the internal `genre` field to the given list.
It is not formatted until it is requested with `meta.genre`.
:param val:
:type val: Union[str, list]
"""
self._genres = val
@property
def copyright(self) -> Union[str, None]:
"""Formats the copyright string to use nice-looking unicode
characters.
:rtype: str, None
"""
if hasattr(self, "_copyright"):
if self._copyright is None:
return None
cr = self._copyright.replace("(P)", PHON_COPYRIGHT)
cr = cr.replace("(C)", COPYRIGHT)
return cr
logger.debug("Accessed copyright tag before setting, return None")
return None
@copyright.setter
def copyright(self, val: str):
"""Sets the internal copyright variable to the given value.
Only formatted when requested.
:param val:
:type val: str
"""
self._copyright = val
@property
def year(self) -> Union[str, None]:
"""Returns the year published of the track.
:rtype: str
"""
if hasattr(self, "_year"):
return self._year
if hasattr(self, "date"):
if self.date is not None:
return self.date[:4]
@year.setter
def year(self, val):
"""Sets the internal year variable to val.
:param val:
"""
self._year = val
def get_formatter(self) -> dict:
"""Returns a dict that is used to apply values to file format strings.
:rtype: dict
"""
# the keys in the tuple are the possible keys for format strings
return {k: getattr(self, k) for k in TRACK_KEYS}
def tags(self, container: str = "flac") -> Generator:
"""Return a generator of (key, value) pairs to use for tagging
files with mutagen. The *_KEY dicts are organized in the format
>>> {attribute_name: key_to_use_for_metadata}
They are then converted to the format
>>> {key_to_use_for_metadata: value_of_attribute}
so that they can be used like this:
>>> audio = MP4(path)
>>> for k, v in meta.tags(container='MP4'):
... audio[k] = v
>>> audio.save()
:param container: the container format
:type container: str
:rtype: Generator
"""
container = container.lower()
if container in ("flac", "vorbis"):
return self.__gen_flac_tags()
elif container in ("mp3", "id3"):
return self.__gen_mp3_tags()
elif container in ("alac", "m4a", "mp4", "aac"):
return self.__gen_mp4_tags()
else:
raise InvalidContainerError(f"Invalid container {container}")
def __gen_flac_tags(self) -> Tuple[str, str]:
"""Generate key, value pairs to tag FLAC files.
:rtype: Tuple[str, str]
"""
for k, v in FLAC_KEY.items():
tag = getattr(self, k)
if tag:
logger.debug(f"Adding tag {v}: {repr(tag)}")
yield (v, tag)
def __gen_mp3_tags(self) -> Tuple[str, str]:
"""Generate key, value pairs to tag MP3 files.
:rtype: Tuple[str, str]
"""
for k, v in MP3_KEY.items():
if k == "tracknumber":
text = f"{self.tracknumber}/{self.tracktotal}"
elif k == "discnumber":
text = str(self.discnumber)
else:
text = getattr(self, k)
if text is not None:
yield (v.__name__, v(encoding=3, text=text))
def __mp4_tags(self) -> Tuple[str, str]:
"""Generate key, value pairs to tag ALAC or AAC files in
an MP4 container.
:rtype: Tuple[str, str]
"""
for k, v in MP4_KEY.items():
return (v, getattr(self, k))
def __setitem__(self, key, val):
"""Dict-like access for tags.
:param key:
:param val:
"""
setattr(self, key, val)
def __getitem__(self, key):
"""Dict-like access for tags.
:param key:
"""
return getattr(self, key)
def get(self, key, default=None) -> str:
"""Returns the requested attribute of the object, with
a default value.
:param key:
:param default:
"""
if hasattr(self, key):
res = self.__getitem__(key)
if res is not None:
return res
return default
return default
def set(self, key, val) -> str:
"""Equivalent to
>>> meta[key] = val
:param key:
:param val:
:rtype: str
"""
return self.__setitem__(key, val)
def __repr__(self) -> str:
"""Returns the string representation of the metadata object.
:rtype: str
"""
# TODO: make a more readable repr
return json.dumps(self.__dict__, indent=2)

56
music_dl/spoofbuz.py Normal file
View file

@ -0,0 +1,56 @@
# Credits to Dash for this tool.
import base64
import re
from collections import OrderedDict
import requests
class Spoofer:
def __init__(self):
self.seed_timezone_regex = r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.utimezone\.(?P<timezone>[a-z]+)\)'
# note: {timezones} should be replaced with every capitalized timezone joined by a |
self.info_extras_regex = r'name:"\w+/(?P<timezone>{timezones})",info:"(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"'
self.app_id_regex = r'{app_id:"(?P<app_id>\d{9})",app_secret:"\w{32}",base_port:"80",base_url:"https://www\.qobuz\.com",base_method:"/api\.json/0\.2/"},n\.base_url="https://play\.qobuz\.com"'
login_page_request = requests.get("https://play.qobuz.com/login")
login_page = login_page_request.text
bundle_url_match = re.search(
r'<script src="(/resources/\d+\.\d+\.\d+-[a-z]\d{3}/bundle\.js)"></script>',
login_page,
)
bundle_url = bundle_url_match.group(1)
bundle_req = requests.get("https://play.qobuz.com" + bundle_url)
self.bundle = bundle_req.text
def get_app_id(self):
match = re.search(self.app_id_regex, self.bundle).group("app_id")
return str(match)
def get_secrets(self):
seed_matches = re.finditer(self.seed_timezone_regex, self.bundle)
secrets = OrderedDict()
for match in seed_matches:
seed, timezone = match.group("seed", "timezone")
secrets[timezone] = [seed]
"""The code that follows switches around the first and second timezone. Why? Read on:
Qobuz uses two ternary (a shortened if statement) conditions that should always return false.
The way Javascript's ternary syntax works, the second option listed is what runs if the condition returns false.
Because of this, we must prioritize the *second* seed/timezone pair captured, not the first.
"""
keypairs = list(secrets.items())
secrets.move_to_end(keypairs[1][0], last=False)
info_extras_regex = self.info_extras_regex.format(
timezones="|".join([timezone.capitalize() for timezone in secrets])
)
info_extras_matches = re.finditer(info_extras_regex, self.bundle)
for match in info_extras_matches:
timezone, info, extras = match.group("timezone", "info", "extras")
secrets[timezone.lower()] += [info, extras]
for secret_pair in secrets:
secrets[secret_pair] = base64.standard_b64decode(
"".join(secrets[secret_pair])[:-44]
).decode("utf-8")
vals = list(secrets.values())
vals.remove("")
return vals

154
music_dl/utils.py Normal file
View file

@ -0,0 +1,154 @@
import logging
import logging.handlers as handlers
import os
from string import Formatter
from typing import Optional
import requests
from pathvalidate import sanitize_filename
from tqdm import tqdm
from .constants import LOG_DIR, TIDAL_COVER_URL
logger = logging.getLogger(__name__)
def safe_get(d: dict, *keys, default=None):
"""A replacement for chained `get()` statements on dicts:
>>> d = {'foo': {'bar': 'baz'}}
>>> _safe_get(d, 'baz')
None
>>> _safe_get(d, 'foo', 'bar')
'baz'
"""
curr = d
res = default
for key in keys:
res = curr.get(key, default)
if res == default or not hasattr(res, "__getitem__"):
return res
else:
curr = res
return res
def quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
"""Return a quality id in (5, 6, 7, 27) from bit depth and
sampling rate. If None is provided, mp3/lossy is assumed.
:param bit_depth:
:type bit_depth: Optional[int]
:param sampling_rate:
:type sampling_rate: Optional[int]
"""
if not (bit_depth or sampling_rate): # is lossy
return 5
if bit_depth == 16:
return 6
if bit_depth == 24:
if sampling_rate <= 96:
return 7
return 27
def tqdm_download(url: str, filepath: str):
"""Downloads a file with a progress bar.
:param url: url to direct download
:param filepath: file to write
:type url: str
:type filepath: str
"""
# FIXME: add the conditional to the progress_bar bool
logger.debug(f"Downloading {url} to {filepath}")
r = requests.get(url, allow_redirects=True, stream=True)
total = int(r.headers.get("content-length", 0))
logger.debug(f"File size = {total}")
try:
with open(filepath, "wb") as file, tqdm(
total=total, unit="iB", unit_scale=True, unit_divisor=1024
) as bar:
for data in r.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
except Exception:
try:
os.remove(filepath)
except OSError:
pass
raise
def clean_format(formatter: str, format_info):
"""Formats track or folder names sanitizing every formatter key.
:param formatter:
:type formatter: str
:param kwargs:
"""
fmt_keys = [i[1] for i in Formatter().parse(formatter) if i[1] is not None]
logger.debug("Formatter keys: %s", fmt_keys)
clean_dict = dict()
for key in fmt_keys:
if isinstance(format_info.get(key), (str, int, float)): # int for track numbers
clean_dict[key] = sanitize_filename(str(format_info[key]))
else:
clean_dict[key] = "Unknown"
return formatter.format(**clean_dict)
def tidal_cover_url(uuid, size):
possibles = (80, 160, 320, 640, 1280)
assert size in possibles, f"size must be in {possibles}"
return TIDAL_COVER_URL.format(uuid=uuid.replace("-", "/"), height=size, width=size)
def init_log(
path: Optional[str] = None, level: str = "DEBUG", rotate: str = "midnight"
):
"""
Initialize a log instance with a stream handler and a rotating file handler.
If a path is not set, fallback to the default app log directory.
:param path:
:type path: Optional[str]
:param level:
:type level: str
:param rotate:
:type rotate: str
"""
if not path:
os.makedirs(LOG_DIR, exist_ok=True)
path = os.path.join(LOG_DIR, "qobuz_dl.log")
logger = logging.getLogger()
level = logging.getLevelName(level)
logger.setLevel(level)
formatter = logging.Formatter(
fmt="%(asctime)s - %(module)s.%(funcName)s.%(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
rotable = handlers.TimedRotatingFileHandler(path, when=rotate)
printable = logging.StreamHandler()
rotable.setFormatter(formatter)
printable.setFormatter(formatter)
logger.addHandler(printable)
logger.addHandler(rotable)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("tidal_api").setLevel(logging.WARNING)
def capitalize(s: str) -> str:
return s[0].upper() + s[1:]