Change Black max line length to 79

This commit is contained in:
nathom 2021-05-13 18:46:41 -07:00
parent e941b89153
commit f8df594031
11 changed files with 324 additions and 107 deletions

View file

@ -216,7 +216,9 @@ class QobuzClient(Client):
:rtype: dict
"""
page, status_code = self._api_request(epoint, params)
logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys()))
logger.debug(
"Keys returned from _gen_pages: %s", ", ".join(page.keys())
)
key = epoint.split("/")[0] + "s"
total = page.get(key, {})
total = total.get("total") or total.get("items")
@ -240,7 +242,9 @@ class QobuzClient(Client):
for secret in self.secrets:
if self._test_secret(secret):
self.sec = secret
logger.debug("Working secret and app_id: %s - %s", secret, self.app_id)
logger.debug(
"Working secret and app_id: %s - %s", secret, self.app_id
)
break
if not hasattr(self, "sec"):
raise InvalidAppSecretError(f"Invalid secrets: {self.secrets}")
@ -274,11 +278,15 @@ class QobuzClient(Client):
response, status_code = self._api_request(epoint, params)
if status_code != 200:
raise Exception(f'Error fetching metadata. "{response["message"]}"')
raise Exception(
f'Error fetching metadata. "{response["message"]}"'
)
return response
def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator:
def _api_search(
self, query: str, media_type: str, limit: int = 500
) -> Generator:
"""Send a search request to the API.
:param query:
@ -330,14 +338,18 @@ class QobuzClient(Client):
resp, status_code = self._api_request(epoint, params)
if status_code == 401:
raise AuthenticationError(f"Invalid credentials from params {params}")
raise AuthenticationError(
f"Invalid credentials from params {params}"
)
elif status_code == 400:
raise InvalidAppIdError(f"Invalid app id from params {params}")
else:
logger.info("Logged in to Qobuz")
if not resp["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.")
raise IneligibleError(
"Free accounts are not eligible to download tracks."
)
self.uat = resp["user_auth_token"]
self.session.headers.update({"X-User-Auth-Token": self.uat})
@ -386,7 +398,9 @@ class QobuzClient(Client):
}
response, status_code = self._api_request("track/getFileUrl", params)
if status_code == 400:
raise InvalidAppSecretError("Invalid app secret from params %s" % params)
raise InvalidAppSecretError(
"Invalid app secret from params %s" % params
)
return response
@ -404,7 +418,9 @@ class QobuzClient(Client):
try:
return r.json(), r.status_code
except Exception:
logger.error("Problem getting JSON. Status code: %s", r.status_code)
logger.error(
"Problem getting JSON. Status code: %s", r.status_code
)
raise
def _test_secret(self, secret: str) -> bool:
@ -435,7 +451,9 @@ class DeezerClient(Client):
# no login required
self.logged_in = True
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict:
def search(
self, query: str, media_type: str = "album", limit: int = 200
) -> dict:
"""Search API for query.
:param query:
@ -472,7 +490,9 @@ class DeezerClient(Client):
url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
item = self.session.get(url).json()
if media_type in ("album", "playlist"):
tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json()
tracks = self.session.get(
f"{url}/tracks", params={"limit": 1000}
).json()
item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"])
elif media_type == "artist":
@ -568,7 +588,9 @@ class TidalClient(Client):
logger.debug(resp)
return resp
def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict:
def search(
self, query: str, media_type: str = "album", limit: int = 100
) -> dict:
"""Search for a query.
:param query:
@ -597,13 +619,19 @@ class TidalClient(Client):
return self._get_video_stream_url(track_id)
params = {
"audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source),
"audioquality": get_quality(
min(quality, TIDAL_MAX_Q), self.source
),
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
resp = self._api_request(
f"tracks/{track_id}/playbackinfopostpaywall", params
)
try:
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
except KeyError:
raise Exception(resp["userMessage"])
@ -641,7 +669,8 @@ class TidalClient(Client):
login_link = f"https://{self._get_device_code()}"
click.secho(
f"Go to {login_link} to log into Tidal within 5 minutes.", fg="blue"
f"Go to {login_link} to log into Tidal within 5 minutes.",
fg="blue",
)
if launch:
click.launch(login_link)
@ -808,7 +837,9 @@ class TidalClient(Client):
offset += 100
tracks_left -= 100
resp["items"].extend(
self._api_request(f"{url}/items", {"offset": offset})["items"]
self._api_request(f"{url}/items", {"offset": offset})[
"items"
]
)
item["tracks"] = [item["item"] for item in resp["items"]]
@ -853,7 +884,9 @@ class TidalClient(Client):
r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"'
r",RESOLUTION=\d+x\d+\n(.+)"
)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
available_urls = self.session.get(manifest["urls"][0])
url_info = re.findall(stream_url_regex, available_urls.text)
@ -892,7 +925,10 @@ class SoundCloudClient(Client):
:param id:
:param media_type:
"""
assert media_type in ("track", "playlist"), f"{media_type} not supported"
assert media_type in (
"track",
"playlist",
), f"{media_type} not supported"
if "http" in str(id):
resp, _ = self._get(f"resolve?url={id}")
@ -929,7 +965,10 @@ class SoundCloudClient(Client):
url = None
for tc in track["media"]["transcodings"]:
fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
if (
fmt["protocol"] == "hls"
and fmt["mime_type"] == "audio/mpeg"
):
url = tc["url"]
break