Formatting

Signed-off-by: nathom <nathanthomas707@gmail.com>
This commit is contained in:
nathom 2021-06-19 18:57:50 -07:00
parent 76ba2d413b
commit 7698ad7a2e
5 changed files with 39 additions and 112 deletions

View file

@ -216,9 +216,7 @@ class QobuzClient(Client):
:rtype: dict
"""
page, status_code = self._api_request(epoint, params)
logger.debug(
"Keys returned from _gen_pages: %s", ", ".join(page.keys())
)
logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys()))
key = epoint.split("/")[0] + "s"
total = page.get(key, {})
total = total.get("total") or total.get("items")
@ -242,9 +240,7 @@ class QobuzClient(Client):
for secret in self.secrets:
if self._test_secret(secret):
self.sec = secret
logger.debug(
"Working secret and app_id: %s - %s", secret, self.app_id
)
logger.debug("Working secret and app_id: %s - %s", secret, self.app_id)
break
if not hasattr(self, "sec"):
raise InvalidAppSecretError(f"Invalid secrets: {self.secrets}")
@ -278,15 +274,11 @@ class QobuzClient(Client):
response, status_code = self._api_request(epoint, params)
if status_code != 200:
raise Exception(
f'Error fetching metadata. "{response["message"]}"'
)
raise Exception(f'Error fetching metadata. "{response["message"]}"')
return response
def _api_search(
self, query: str, media_type: str, limit: int = 500
) -> Generator:
def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator:
"""Send a search request to the API.
:param query:
@ -338,18 +330,14 @@ class QobuzClient(Client):
resp, status_code = self._api_request(epoint, params)
if status_code == 401:
raise AuthenticationError(
f"Invalid credentials from params {params}"
)
raise AuthenticationError(f"Invalid credentials from params {params}")
elif status_code == 400:
raise InvalidAppIdError(f"Invalid app id from params {params}")
else:
logger.info("Logged in to Qobuz")
if not resp["user"]["credential"]["parameters"]:
raise IneligibleError(
"Free accounts are not eligible to download tracks."
)
raise IneligibleError("Free accounts are not eligible to download tracks.")
self.uat = resp["user_auth_token"]
self.session.headers.update({"X-User-Auth-Token": self.uat})
@ -398,9 +386,7 @@ class QobuzClient(Client):
}
response, status_code = self._api_request("track/getFileUrl", params)
if status_code == 400:
raise InvalidAppSecretError(
"Invalid app secret from params %s" % params
)
raise InvalidAppSecretError("Invalid app secret from params %s" % params)
return response
@ -418,9 +404,7 @@ class QobuzClient(Client):
try:
return r.json(), r.status_code
except Exception:
logger.error(
"Problem getting JSON. Status code: %s", r.status_code
)
logger.error("Problem getting JSON. Status code: %s", r.status_code)
raise
def _test_secret(self, secret: str) -> bool:
@ -451,9 +435,7 @@ class DeezerClient(Client):
# no login required
self.logged_in = True
def search(
self, query: str, media_type: str = "album", limit: int = 200
) -> dict:
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict:
"""Search API for query.
:param query:
@ -490,9 +472,7 @@ class DeezerClient(Client):
url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
item = self.session.get(url).json()
if media_type in ("album", "playlist"):
tracks = self.session.get(
f"{url}/tracks", params={"limit": 1000}
).json()
tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json()
item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"])
elif media_type == "artist":
@ -588,9 +568,7 @@ class TidalClient(Client):
logger.debug(resp)
return resp
def search(
self, query: str, media_type: str = "album", limit: int = 100
) -> dict:
def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict:
"""Search for a query.
:param query:
@ -619,19 +597,13 @@ class TidalClient(Client):
return self._get_video_stream_url(track_id)
params = {
"audioquality": get_quality(
min(quality, TIDAL_MAX_Q), self.source
),
"audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source),
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = self._api_request(
f"tracks/{track_id}/playbackinfopostpaywall", params
)
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
try:
manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
except KeyError:
raise Exception(resp["userMessage"])
@ -837,9 +809,7 @@ class TidalClient(Client):
offset += 100
tracks_left -= 100
resp["items"].extend(
self._api_request(f"{url}/items", {"offset": offset})[
"items"
]
self._api_request(f"{url}/items", {"offset": offset})["items"]
)
item["tracks"] = [item["item"] for item in resp["items"]]
@ -884,9 +854,7 @@ class TidalClient(Client):
r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"'
r",RESOLUTION=\d+x\d+\n(.+)"
)
manifest = json.loads(
base64.b64decode(resp["manifest"]).decode("utf-8")
)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
available_urls = self.session.get(manifest["urls"][0])
url_info = re.findall(stream_url_regex, available_urls.text)
@ -965,10 +933,7 @@ class SoundCloudClient(Client):
url = None
for tc in track["media"]["transcodings"]:
fmt = tc["format"]
if (
fmt["protocol"] == "hls"
and fmt["mime_type"] == "audio/mpeg"
):
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
url = tc["url"]
break