Set max line length to 79

This commit is contained in:
Nathan Thomas 2021-08-17 10:46:37 -07:00
parent aac4e8c72d
commit b7ee7295ca
7 changed files with 225 additions and 64 deletions

17
q Normal file
View file

@ -0,0 +1,17 @@
- `title`
- `albumartist`
- `composer`
- `albumcomposer`
- `comment`
- `description`
- `purchase_date`
- `date`
- `grouping`
- `lyrics`
- `encoder`
- `compilation`
- `cover`
- `tracktotal`
- `tracknumber`
- `discnumber`
- `disctotal`

View file

@ -2,6 +2,7 @@ import concurrent.futures
import logging import logging
import os import os
import threading import threading
from typing import Optional
import requests import requests
from cleo.application import Application as BaseApplication from cleo.application import Application as BaseApplication
@ -85,7 +86,9 @@ class DownloadCommand(Command):
if len(core) > 0: if len(core) > 0:
core.download() core.download()
elif not urls and path is None: elif not urls and path is None:
self.line("<error>Must pass arguments. See </><cmd>rip url -h</cmd>.") self.line(
"<error>Must pass arguments. See </><cmd>rip url -h</cmd>."
)
update_check.join() update_check.join()
if outdated: if outdated:
@ -112,10 +115,16 @@ class DownloadCommand(Command):
"https://api.github.com/repos/nathom/streamrip/releases/latest" "https://api.github.com/repos/nathom/streamrip/releases/latest"
).json()["body"] ).json()["body"]
release_notes = md_header.sub(r"<header>\1</header>", release_notes) release_notes = md_header.sub(
release_notes = bullet_point.sub(r"<options=bold>•</> \1", release_notes) r"<header>\1</header>", release_notes
)
release_notes = bullet_point.sub(
r"<options=bold>•</> \1", release_notes
)
release_notes = code.sub(r"<cmd>\1</cmd>", release_notes) release_notes = code.sub(r"<cmd>\1</cmd>", release_notes)
release_notes = issue_reference.sub(r"<options=bold>\1</>", release_notes) release_notes = issue_reference.sub(
r"<options=bold>\1</>", release_notes
)
self.line(release_notes) self.line(release_notes)
@ -145,7 +154,9 @@ class SearchCommand(Command):
def handle(self): def handle(self):
query = self.argument("query") query = self.argument("query")
source, type = clean_options(self.option("source"), self.option("type")) source, type = clean_options(
self.option("source"), self.option("type")
)
config = Config() config = Config()
core = RipCore(config) core = RipCore(config)
@ -197,7 +208,9 @@ class DiscoverCommand(Command):
max_items = self.option("max-items") max_items = self.option("max-items")
if chosen_list not in QOBUZ_FEATURED_KEYS: if chosen_list not in QOBUZ_FEATURED_KEYS:
self.line(f'<error>Error: list "{chosen_list}" not available</error>') self.line(
f'<error>Error: list "{chosen_list}" not available</error>'
)
self.line(self.help) self.line(self.help)
return 1 return 1
@ -259,29 +272,34 @@ class ConfigCommand(Command):
{--qobuz : Set the credentials for Qobuz} {--qobuz : Set the credentials for Qobuz}
{--tidal : Log into Tidal} {--tidal : Log into Tidal}
{--deezer : Set the Deezer ARL} {--deezer : Set the Deezer ARL}
{--music-app : Configure the config file for usage with the macOS Music App}
{--reset : Reset the config file} {--reset : Reset the config file}
{--update : Reset the config file, keeping the credentials} {--update : Reset the config file, keeping the credentials}
""" """
_config: Optional[Config]
def handle(self): def handle(self):
import shutil import shutil
from .constants import CONFIG_DIR, CONFIG_PATH from .constants import CONFIG_DIR, CONFIG_PATH
config = Config() self._config = Config()
if self.option("path"): if self.option("path"):
self.line(f"<info>{CONFIG_PATH}</info>") self.line(f"<info>{CONFIG_PATH}</info>")
if self.option("open"): if self.option("open"):
self.line(f"Opening <url>{CONFIG_PATH}</url> in default application") self.line(
f"Opening <url>{CONFIG_PATH}</url> in default application"
)
launch(CONFIG_PATH) launch(CONFIG_PATH)
if self.option("reset"): if self.option("reset"):
config.reset() self._config.reset()
if self.option("update"): if self.option("update"):
config.update() self._config.update()
if self.option("open-vim"): if self.option("open-vim"):
if shutil.which("nvim") is not None: if shutil.which("nvim") is not None:
@ -298,8 +316,8 @@ class ConfigCommand(Command):
client = TidalClient() client = TidalClient()
client.login() client.login()
config.file["tidal"].update(client.get_tokens()) self._config.file["tidal"].update(client.get_tokens())
config.save() self._config.save()
self.line("<info>Credentials saved to config.</info>") self.line("<info>Credentials saved to config.</info>")
if self.option("deezer"): if self.option("deezer"):
@ -316,22 +334,71 @@ class ConfigCommand(Command):
try: try:
DeezerClient().login(arl=given_arl) DeezerClient().login(arl=given_arl)
config.file["deezer"]["arl"] = given_arl self._config.file["deezer"]["arl"] = given_arl
config.save() self._config.save()
self.line("<b>Sucessfully logged in!</b>") self.line("<b>Sucessfully logged in!</b>")
except AuthenticationError: except AuthenticationError:
self.line("<error>Could not log in. Double check your ARL</error>") self.line(
"<error>Could not log in. Double check your ARL</error>"
)
if self.option("qobuz"): if self.option("qobuz"):
import getpass import getpass
import hashlib import hashlib
config.file["qobuz"]["email"] = self.ask("Qobuz email:") self._config.file["qobuz"]["email"] = self.ask("Qobuz email:")
config.file["qobuz"]["password"] = hashlib.md5( self._config.file["qobuz"]["password"] = hashlib.md5(
getpass.getpass("Qobuz password (won't show on screen): ").encode() getpass.getpass(
"Qobuz password (won't show on screen): "
).encode()
).hexdigest() ).hexdigest()
config.save() self._config.save()
if self.option("music-app"):
self._conf_music_app()
def _conf_music_app(self):
import subprocess
import xml.etree.ElementTree as ET
from pathlib import Path
from tempfile import mktemp
# Find the Music library folder
temp_file = mktemp()
music_pref_plist = Path(Path.home()) / Path(
"Library/Preferences/com.apple.Music.plist"
)
# copy preferences to tempdir
subprocess.run(["cp", music_pref_plist, temp_file])
# convert binary to xml for parsing
subprocess.run(["plutil", "-convert", "xml1", temp_file])
items = iter(ET.parse(temp_file).getroot()[0])
for item in items:
if item.text == "NSNavLastRootDirectory":
break
library_folder = Path(next(items).text)
os.remove(temp_file)
# cp ~/library/preferences/com.apple.music.plist music.plist
# plutil -convert xml1 music.plist
# cat music.plist | pbcopy
self._config.file["downloads"]["folder"] = os.path.join(
library_folder, "Automatically Add to Music.localized"
)
conversion_config = self._config.file["conversion"]
conversion_config["enabled"] = True
conversion_config["codec"] = "ALAC"
conversion_config["sampling_rate"] = 48000
conversion_config["bit_depth"] = 24
self._config.file["filepaths"]["folder_format"] = ""
self._config.file["artwork"]["keep_hires_cover"] = False
self._config.save()
class ConvertCommand(Command): class ConvertCommand(Command):
@ -409,7 +476,8 @@ class ConvertCommand(Command):
futures.append( futures.append(
executor.submit( executor.submit(
ConverterCls( ConverterCls(
filename=os.path.join(dirname, file), **converter_args filename=os.path.join(dirname, file),
**converter_args,
).convert ).convert
) )
) )
@ -428,7 +496,8 @@ class ConvertCommand(Command):
ConverterCls(filename=path, **converter_args).convert() ConverterCls(filename=path, **converter_args).convert()
else: else:
self.line( self.line(
f'<error>Path <path>"{path}"</path> does not exist.</error>', fg="red" f'<error>Path <path>"{path}"</path> does not exist.</error>',
fg="red",
) )
@ -534,7 +603,9 @@ class Application(BaseApplication):
formatter.set_style("path", Style("green", options=["bold"])) formatter.set_style("path", Style("green", options=["bold"]))
formatter.set_style("cmd", Style("magenta")) formatter.set_style("cmd", Style("magenta"))
formatter.set_style("title", Style("yellow", options=["bold"])) formatter.set_style("title", Style("yellow", options=["bold"]))
formatter.set_style("header", Style("yellow", options=["bold", "underline"])) formatter.set_style(
"header", Style("yellow", options=["bold", "underline"])
)
io.output.set_formatter(formatter) io.output.set_formatter(formatter)
io.error_output.set_formatter(formatter) io.error_output.set_formatter(formatter)

View file

@ -71,11 +71,15 @@ class Database:
with sqlite3.connect(self.path) as conn: with sqlite3.connect(self.path) as conn:
conditions = " AND ".join(f"{key}=?" for key in items.keys()) conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})" command = (
f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})"
)
logger.debug("Executing %s", command) logger.debug("Executing %s", command)
return bool(conn.execute(command, tuple(items.values())).fetchone()[0]) return bool(
conn.execute(command, tuple(items.values())).fetchone()[0]
)
def __contains__(self, keys: Union[str, dict]) -> bool: def __contains__(self, keys: Union[str, dict]) -> bool:
"""Check whether a key-value pair exists in the database. """Check whether a key-value pair exists in the database.
@ -119,7 +123,9 @@ class Database:
params = ", ".join(self.structure.keys()) params = ", ".join(self.structure.keys())
question_marks = ", ".join("?" for _ in items) question_marks = ", ".join("?" for _ in items)
command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})" command = (
f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})"
)
logger.debug("Executing %s", command) logger.debug("Executing %s", command)
logger.debug("Items to add: %s", items) logger.debug("Items to add: %s", items)

View file

@ -132,7 +132,10 @@ class QobuzClient(Client):
if not kwargs.get("app_id") or not kwargs.get("secrets"): if not kwargs.get("app_id") or not kwargs.get("secrets"):
self._get_app_id_and_secrets() # can be async self._get_app_id_and_secrets() # can be async
else: else:
self.app_id, self.secrets = str(kwargs["app_id"]), kwargs["secrets"] self.app_id, self.secrets = (
str(kwargs["app_id"]),
kwargs["secrets"],
)
self.session = gen_threadsafe_session( self.session = gen_threadsafe_session(
headers={"User-Agent": AGENT, "X-App-Id": self.app_id} headers={"User-Agent": AGENT, "X-App-Id": self.app_id}
) )
@ -215,7 +218,10 @@ class QobuzClient(Client):
def _get_app_id_and_secrets(self): def _get_app_id_and_secrets(self):
if not hasattr(self, "app_id") or not hasattr(self, "secrets"): if not hasattr(self, "app_id") or not hasattr(self, "secrets"):
spoofer = Spoofer() spoofer = Spoofer()
self.app_id, self.secrets = str(spoofer.get_app_id()), spoofer.get_secrets() self.app_id, self.secrets = (
str(spoofer.get_app_id()),
spoofer.get_secrets(),
)
if not hasattr(self, "sec"): if not hasattr(self, "sec"):
if not hasattr(self, "session"): if not hasattr(self, "session"):
@ -234,7 +240,9 @@ class QobuzClient(Client):
:rtype: dict :rtype: dict
""" """
page, status_code = self._api_request(epoint, params) page, status_code = self._api_request(epoint, params)
logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys())) logger.debug(
"Keys returned from _gen_pages: %s", ", ".join(page.keys())
)
key = epoint.split("/")[0] + "s" key = epoint.split("/")[0] + "s"
total = page.get(key, {}) total = page.get(key, {})
total = total.get("total") or total.get("items") total = total.get("total") or total.get("items")
@ -257,7 +265,8 @@ class QobuzClient(Client):
"""Check if the secrets are usable.""" """Check if the secrets are usable."""
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [ futures = [
executor.submit(self._test_secret, secret) for secret in self.secrets executor.submit(self._test_secret, secret)
for secret in self.secrets
] ]
for future in concurrent.futures.as_completed(futures): for future in concurrent.futures.as_completed(futures):
@ -300,11 +309,15 @@ class QobuzClient(Client):
response, status_code = self._api_request(epoint, params) response, status_code = self._api_request(epoint, params)
if status_code != 200: if status_code != 200:
raise Exception(f'Error fetching metadata. "{response["message"]}"') raise Exception(
f'Error fetching metadata. "{response["message"]}"'
)
return response return response
def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator: def _api_search(
self, query: str, media_type: str, limit: int = 500
) -> Generator:
"""Send a search request to the API. """Send a search request to the API.
:param query: :param query:
@ -356,7 +369,9 @@ class QobuzClient(Client):
resp, status_code = self._api_request(epoint, params) resp, status_code = self._api_request(epoint, params)
if status_code == 401: if status_code == 401:
raise AuthenticationError(f"Invalid credentials from params {params}") raise AuthenticationError(
f"Invalid credentials from params {params}"
)
elif status_code == 400: elif status_code == 400:
logger.debug(resp) logger.debug(resp)
raise InvalidAppIdError(f"Invalid app id from params {params}") raise InvalidAppIdError(f"Invalid app id from params {params}")
@ -364,7 +379,9 @@ class QobuzClient(Client):
logger.info("Logged in to Qobuz") logger.info("Logged in to Qobuz")
if not resp["user"]["credential"]["parameters"]: if not resp["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.") raise IneligibleError(
"Free accounts are not eligible to download tracks."
)
self.uat = resp["user_auth_token"] self.uat = resp["user_auth_token"]
self.session.headers.update({"X-User-Auth-Token": self.uat}) self.session.headers.update({"X-User-Auth-Token": self.uat})
@ -413,7 +430,9 @@ class QobuzClient(Client):
} }
response, status_code = self._api_request("track/getFileUrl", params) response, status_code = self._api_request("track/getFileUrl", params)
if status_code == 400: if status_code == 400:
raise InvalidAppSecretError("Invalid app secret from params %s" % params) raise InvalidAppSecretError(
"Invalid app secret from params %s" % params
)
return response return response
@ -432,7 +451,9 @@ class QobuzClient(Client):
logger.debug(r.text) logger.debug(r.text)
return r.json(), r.status_code return r.json(), r.status_code
except Exception: except Exception:
logger.error("Problem getting JSON. Status code: %s", r.status_code) logger.error(
"Problem getting JSON. Status code: %s", r.status_code
)
raise raise
def _test_secret(self, secret: str) -> Optional[str]: def _test_secret(self, secret: str) -> Optional[str]:
@ -464,7 +485,9 @@ class DeezerClient(Client):
# no login required # no login required
self.logged_in = False self.logged_in = False
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict: def search(
self, query: str, media_type: str = "album", limit: int = 200
) -> dict:
"""Search API for query. """Search API for query.
:param query: :param query:
@ -550,9 +573,9 @@ class DeezerClient(Client):
format_no, format_str = format_info format_no, format_str = format_info
dl_info["size_to_quality"] = { dl_info["size_to_quality"] = {
int(track_info.get(f"FILESIZE_{format}")): self._quality_id_from_filetype( int(
format track_info.get(f"FILESIZE_{format}")
) ): self._quality_id_from_filetype(format)
for format in DEEZER_FORMATS for format in DEEZER_FORMATS
} }
@ -593,7 +616,9 @@ class DeezerClient(Client):
logger.debug("Info bytes: %s", info_bytes) logger.debug("Info bytes: %s", info_bytes)
path = self._gen_url_path(info_bytes) path = self._gen_url_path(info_bytes)
logger.debug(path) logger.debug(path)
return f"https://e-cdns-proxy-{track_hash[0]}.dzcdn.net/mobile/1/{path}" return (
f"https://e-cdns-proxy-{track_hash[0]}.dzcdn.net/mobile/1/{path}"
)
def _gen_url_path(self, data): def _gen_url_path(self, data):
return binascii.hexlify( return binascii.hexlify(
@ -623,7 +648,9 @@ class DeezloaderClient(Client):
# no login required # no login required
self.logged_in = True self.logged_in = True
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict: def search(
self, query: str, media_type: str = "album", limit: int = 200
) -> dict:
"""Search API for query. """Search API for query.
:param query: :param query:
@ -660,7 +687,9 @@ class DeezloaderClient(Client):
url = f"{DEEZER_BASE}/{media_type}/{meta_id}" url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
item = self.session.get(url).json() item = self.session.get(url).json()
if media_type in ("album", "playlist"): if media_type in ("album", "playlist"):
tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json() tracks = self.session.get(
f"{url}/tracks", params={"limit": 1000}
).json()
item["tracks"] = tracks["data"] item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"]) item["track_total"] = len(tracks["data"])
elif media_type == "artist": elif media_type == "artist":
@ -756,7 +785,9 @@ class TidalClient(Client):
logger.debug(resp) logger.debug(resp)
return resp return resp
def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict: def search(
self, query: str, media_type: str = "album", limit: int = 100
) -> dict:
"""Search for a query. """Search for a query.
:param query: :param query:
@ -785,13 +816,19 @@ class TidalClient(Client):
return self._get_video_stream_url(track_id) return self._get_video_stream_url(track_id)
params = { params = {
"audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source), "audioquality": get_quality(
min(quality, TIDAL_MAX_Q), self.source
),
"playbackmode": "STREAM", "playbackmode": "STREAM",
"assetpresentation": "FULL", "assetpresentation": "FULL",
} }
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params) resp = self._api_request(
f"tracks/{track_id}/playbackinfopostpaywall", params
)
try: try:
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
except KeyError: except KeyError:
raise Exception(resp["userMessage"]) raise Exception(resp["userMessage"])
@ -996,7 +1033,9 @@ class TidalClient(Client):
offset += 100 offset += 100
tracks_left -= 100 tracks_left -= 100
resp["items"].extend( resp["items"].extend(
self._api_request(f"{url}/items", {"offset": offset})["items"] self._api_request(f"{url}/items", {"offset": offset})[
"items"
]
) )
item["tracks"] = [item["item"] for item in resp["items"]] item["tracks"] = [item["item"] for item in resp["items"]]
@ -1048,7 +1087,9 @@ class TidalClient(Client):
r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"' r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"'
r",RESOLUTION=\d+x\d+\n(.+)" r",RESOLUTION=\d+x\d+\n(.+)"
) )
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
available_urls = self.session.get(manifest["urls"][0]) available_urls = self.session.get(manifest["urls"][0])
url_info = re.findall(stream_url_regex, available_urls.text) url_info = re.findall(stream_url_regex, available_urls.text)
@ -1138,7 +1179,10 @@ class SoundCloudClient(Client):
url = None url = None
for tc in track["media"]["transcodings"]: for tc in track["media"]["transcodings"]:
fmt = tc["format"] fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": if (
fmt["protocol"] == "hls"
and fmt["mime_type"] == "audio/mpeg"
):
url = tc["url"] url = tc["url"]
break break

View file

@ -4,7 +4,9 @@ import mutagen.id3 as id3
AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0" AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0"
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" TIDAL_COVER_URL = (
"https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
)
SOUNDCLOUD_CLIENT_ID = "QFciLWLC1GS4P3EZvXIjA3jKhKO5pKB3" SOUNDCLOUD_CLIENT_ID = "QFciLWLC1GS4P3EZvXIjA3jKhKO5pKB3"
SOUNDCLOUD_USER_ID = "672320-86895-162383-801513" SOUNDCLOUD_USER_ID = "672320-86895-162383-801513"
SOUNDCLOUD_APP_VERSION = "1626941202" SOUNDCLOUD_APP_VERSION = "1626941202"
@ -136,9 +138,7 @@ ALBUM_KEYS = (
"albumcomposer", "albumcomposer",
) )
# TODO: rename these to DEFAULT_FOLDER_FORMAT etc # TODO: rename these to DEFAULT_FOLDER_FORMAT etc
FOLDER_FORMAT = ( FOLDER_FORMAT = "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
"{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
)
TRACK_FORMAT = "{tracknumber}. {artist} - {title}" TRACK_FORMAT = "{tracknumber}. {artist} - {title}"

View file

@ -52,7 +52,9 @@ class Converter:
self.filename = filename self.filename = filename
self.final_fn = f"{os.path.splitext(filename)[0]}.{self.container}" self.final_fn = f"{os.path.splitext(filename)[0]}.{self.container}"
self.tempfile = os.path.join(gettempdir(), os.path.basename(self.final_fn)) self.tempfile = os.path.join(
gettempdir(), os.path.basename(self.final_fn)
)
self.remove_source = remove_source self.remove_source = remove_source
self.sampling_rate = sampling_rate self.sampling_rate = sampling_rate
self.bit_depth = bit_depth self.bit_depth = bit_depth
@ -117,9 +119,13 @@ class Converter:
if self.lossless: if self.lossless:
if isinstance(self.sampling_rate, int): if isinstance(self.sampling_rate, int):
sampling_rates = "|".join( sampling_rates = "|".join(
str(rate) for rate in SAMPLING_RATES if rate <= self.sampling_rate str(rate)
for rate in SAMPLING_RATES
if rate <= self.sampling_rate
)
command.extend(
["-af", f"aformat=sample_rates={sampling_rates}"]
) )
command.extend(["-af", f"aformat=sample_rates={sampling_rates}"])
elif self.sampling_rate is not None: elif self.sampling_rate is not None:
raise TypeError( raise TypeError(
@ -134,7 +140,9 @@ class Converter:
else: else:
raise ValueError("Bit depth must be 16, 24, or 32") raise ValueError("Bit depth must be 16, 24, or 32")
elif self.bit_depth is not None: elif self.bit_depth is not None:
raise TypeError(f"Bit depth must be int, not {type(self.bit_depth)}") raise TypeError(
f"Bit depth must be int, not {type(self.bit_depth)}"
)
# automatically overwrite # automatically overwrite
command.extend(["-y", self.tempfile]) command.extend(["-y", self.tempfile])
@ -199,7 +207,9 @@ class Vorbis(Converter):
codec_name = "vorbis" codec_name = "vorbis"
codec_lib = "libvorbis" codec_lib = "libvorbis"
container = "ogg" container = "ogg"
default_ffmpeg_arg = "-q:a 6" # 160, aka the "high" quality profile from Spotify default_ffmpeg_arg = (
"-q:a 6" # 160, aka the "high" quality profile from Spotify
)
class OPUS(Converter): class OPUS(Converter):

View file

@ -82,7 +82,9 @@ class DownloadStream:
info = self.request.json() info = self.request.json()
try: try:
# Usually happens with deezloader downloads # Usually happens with deezloader downloads
raise NonStreamable(f"{info['error']} -- {info['message']}") raise NonStreamable(
f"{info['error']} -- {info['message']}"
)
except KeyError: except KeyError:
raise NonStreamable(info) raise NonStreamable(info)
@ -94,7 +96,10 @@ class DownloadStream:
:rtype: Iterator :rtype: Iterator
""" """
if self.source == "deezer" and self.is_encrypted.search(self.url) is not None: if (
self.source == "deezer"
and self.is_encrypted.search(self.url) is not None
):
assert isinstance(self.id, str), self.id assert isinstance(self.id, str), self.id
blowfish_key = self._generate_blowfish_key(self.id) blowfish_key = self._generate_blowfish_key(self.id)
@ -121,7 +126,9 @@ class DownloadStream:
return self.file_size return self.file_size
def _create_deezer_decryptor(self, key) -> Blowfish: def _create_deezer_decryptor(self, key) -> Blowfish:
return Blowfish.new(key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07") return Blowfish.new(
key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07"
)
@staticmethod @staticmethod
def _generate_blowfish_key(track_id: str): def _generate_blowfish_key(track_id: str):
@ -204,7 +211,9 @@ __QUALITY_MAP: Dict[str, Dict[int, Union[int, str, Tuple[int, str]]]] = {
} }
def get_quality(quality_id: int, source: str) -> Union[str, int, Tuple[int, str]]: def get_quality(
quality_id: int, source: str
) -> Union[str, int, Tuple[int, str]]:
"""Get the source-specific quality id. """Get the source-specific quality id.
:param quality_id: the universal quality id (0, 1, 2, 4) :param quality_id: the universal quality id (0, 1, 2, 4)
@ -291,7 +300,9 @@ def tidal_cover_url(uuid, size):
possibles = (80, 160, 320, 640, 1280) possibles = (80, 160, 320, 640, 1280)
assert size in possibles, f"size must be in {possibles}" assert size in possibles, f"size must be in {possibles}"
return TIDAL_COVER_URL.format(uuid=uuid.replace("-", "/"), height=size, width=size) return TIDAL_COVER_URL.format(
uuid=uuid.replace("-", "/"), height=size, width=size
)
def init_log(path: Optional[str] = None, level: str = "DEBUG"): def init_log(path: Optional[str] = None, level: str = "DEBUG"):
@ -393,7 +404,9 @@ def gen_threadsafe_session(
headers = {} headers = {}
session = requests.Session() session = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) adapter = requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100
)
session.mount("https://", adapter) session.mount("https://", adapter)
session.headers.update(headers) session.headers.update(headers)
return session return session