Start paid deezer implementation

This commit is contained in:
nathom 2021-07-26 15:45:34 -07:00
parent 37e2a7e8c1
commit 0dbbba8f67
5 changed files with 99 additions and 48 deletions

View file

@ -1,11 +1,13 @@
"""The clients that interact with the service APIs."""
import base64
import binascii
import hashlib
import json
import logging
import re
import time
import deezer
from abc import ABC, abstractmethod
from typing import Generator, Sequence, Tuple, Union
@ -438,10 +440,11 @@ class DeezerClient(Client):
def __init__(self):
"""Create a DeezerClient."""
self.session = gen_threadsafe_session()
self.client = deezer.Deezer(accept_language="en-US,en;q=0.5")
# self.session = gen_threadsafe_session()
# no login required
self.logged_in = True
# self.logged_in = True
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict:
"""Search API for query.
@ -467,7 +470,7 @@ class DeezerClient(Client):
:param kwargs:
"""
logger.debug("Deezer does not require login call, returning")
assert self.client.login_via_arl(kwargs["arl"])
def get(self, meta_id: Union[str, int], media_type: str = "album"):
"""Get metadata.
@ -477,21 +480,31 @@ class DeezerClient(Client):
:param type_:
:type type_: str
"""
url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
item = self.session.get(url).json()
if media_type in ("album", "playlist"):
tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json()
item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"])
elif media_type == "artist":
albums = self.session.get(f"{url}/albums").json()
item["albums"] = albums["data"]
logger.debug(item)
return item
GET_FUNCTIONS = {
"track": self.client.api.get_track,
"album": self.client.api.get_album,
"playlist": self.client.api.get_playlist,
"artist": self.client.api.get_artist_discography,
}
@staticmethod
def get_file_url(meta_id: Union[str, int], quality: int = 6):
get_item = GET_FUNCTIONS[media_type]
return get_item(meta_id)
# url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
# item = self.session.get(url).json()
# if media_type in ("album", "playlist"):
# tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json()
# item["tracks"] = tracks["data"]
# item["track_total"] = len(tracks["data"])
# elif media_type == "artist":
# albums = self.session.get(f"{url}/albums").json()
# item["albums"] = albums["data"]
# logger.debug(item)
# return item
def get_file_url(self, meta_id: Union[str, int], quality: int = 2):
"""Get downloadable url for a track.
:param meta_id: The track ID.
@ -499,10 +512,35 @@ class DeezerClient(Client):
:param quality:
:type quality: int
"""
quality = min(DEEZER_MAX_Q, quality)
url = f"{DEEZER_DL}/{get_quality(quality, 'deezer')}/{DEEZER_BASE}/track/{meta_id}"
logger.debug(f"Download url {url}")
return {"url": url}
track_info = self.client.gw.get_track(
meta_id,
)
token = track_info["TRACK_TOKEN"]
url = self.client.get_track_url(token, "FLAC")
if url is None:
md5 = track_info["MD5_ORIGIN"]
media_version = track_info["MEDIA_VERSION"]
format_number = 1
url_bytes = b"\xa4".join(
[
md5.encode(),
str(format_number).encode(),
str(meta_id).encode(),
str(media_version).encode(),
]
)
md5val = hashlib.md5(url_bytes).hexdigest()
step2 = (
md5val.encode()
+ b"\xa4"
+ url_bytes
+ b"\xa4"
+ (b"." * (16 - (len(step2) % 16)))
)
urlPart = _ecbCrypt("jo6aey6haid2Teih", step2)
return urlPart.decode("utf-8")
class TidalClient(Client):