Implement paid Deezer downloads

This commit is contained in:
nathom 2021-07-28 14:25:51 -07:00
parent fb87896f3f
commit 64bb0ace79
4 changed files with 341 additions and 77 deletions

View file

@ -4,19 +4,23 @@ import base64
import binascii
import hashlib
import json
from Cryptodome.Cipher import Blowfish, AES # type: ignore
import logging
import re
import time
import deezer
from pprint import pformat
import requests
import deezer # type: ignore
from abc import ABC, abstractmethod
from typing import Generator, Sequence, Tuple, Union
from typing import Generator, Sequence, Tuple, Union, Dict, Any, Optional
import click
import click # type: ignore
from .constants import (
AGENT,
AVAILABLE_QUALITY_IDS,
DEEZER_BASE,
DEEZER_FORMATS,
DEEZER_DL,
DEEZER_MAX_Q,
QOBUZ_BASE,
@ -39,7 +43,7 @@ from .exceptions import (
MissingCredentials,
)
from .spoofbuz import Spoofer
from .utils import gen_threadsafe_session, get_quality
from .utils import gen_threadsafe_session, get_quality, safe_get
logger = logging.getLogger("streamrip")
@ -342,6 +346,7 @@ class QobuzClient(Client):
if status_code == 401:
raise AuthenticationError(f"Invalid credentials from params {params}")
elif status_code == 400:
logger.debug(resp)
raise InvalidAppIdError(f"Invalid app id from params {params}")
else:
logger.info("Logged in to Qobuz")
@ -412,6 +417,7 @@ class QobuzClient(Client):
logging.debug(f"Calling API with endpoint {epoint} params {params}")
r = self.session.get(f"{QOBUZ_BASE}/{epoint}", params=params)
try:
logger.debug(r.text)
return r.json(), r.status_code
except Exception:
logger.error("Problem getting JSON. Status code: %s", r.status_code)
@ -444,7 +450,7 @@ class DeezerClient(Client):
# self.session = gen_threadsafe_session()
# no login required
# self.logged_in = True
self.logged_in = False
def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict:
"""Search API for query.
@ -457,20 +463,25 @@ class DeezerClient(Client):
:type limit: int
"""
# TODO: use limit parameter
response = self.session.get(
f"{DEEZER_BASE}/search/{media_type}", params={"q": query}
)
response.raise_for_status()
return response.json()
try:
search_function = getattr(self.client.api, f"search_{media_type}")
except AttributeError:
raise Exception
response = search_function(query, limit=limit)
return response
def login(self, **kwargs):
"""Return None.
try:
arl = kwargs["arl"]
except KeyError:
raise MissingCredentials
Dummy method.
success = self.client.login_via_arl(arl)
if not success:
raise AuthenticationError
:param kwargs:
"""
assert self.client.login_via_arl(kwargs["arl"])
self.logged_in = True
def get(self, meta_id: Union[str, int], media_type: str = "album"):
"""Get metadata.
@ -485,26 +496,25 @@ class DeezerClient(Client):
"track": self.client.api.get_track,
"album": self.client.api.get_album,
"playlist": self.client.api.get_playlist,
"artist": self.client.api.get_artist_discography,
"artist": self.client.api.get_artist,
}
get_item = GET_FUNCTIONS[media_type]
return get_item(meta_id)
item = get_item(meta_id)
if media_type in ("album", "playlist"):
tracks = getattr(self.client.api, f"get_{media_type}_tracks")(
meta_id, limit=-1
)
item["tracks"] = tracks["data"]
item["track_total"] = len(tracks["data"])
elif media_type == "artist":
albums = self.client.api.get_artist_albums(meta_id)
item["albums"] = albums["data"]
# url = f"{DEEZER_BASE}/{media_type}/{meta_id}"
# item = self.session.get(url).json()
# if media_type in ("album", "playlist"):
# tracks = self.session.get(f"{url}/tracks", params={"limit": 1000}).json()
# item["tracks"] = tracks["data"]
# item["track_total"] = len(tracks["data"])
# elif media_type == "artist":
# albums = self.session.get(f"{url}/albums").json()
# item["albums"] = albums["data"]
logger.debug(item)
return item
# logger.debug(item)
# return item
def get_file_url(self, meta_id: Union[str, int], quality: int = 2):
def get_file_url(self, meta_id: str, quality: int = 2) -> dict:
"""Get downloadable url for a track.
:param meta_id: The track ID.
@ -512,35 +522,127 @@ class DeezerClient(Client):
:param quality:
:type quality: int
"""
track_info = self.client.gw.get_track(
meta_id,
)
# TODO: optimize such that all of the ids are requested at once
dl_info: Dict[str, Any] = {"quality": quality}
track_info = self.client.gw.get_track(meta_id)
logger.debug("Track info: %s", pformat(track_info))
dl_info["fallback_id"] = safe_get(track_info, "FALLBACK", "SNG_ID")
format_info = get_quality(quality, "deezer")
assert isinstance(format_info, tuple)
format_no, format_str = format_info
dl_info["size_to_quality"] = {
int(track_info.get(f"FILESIZE_{format}")): self._quality_id_from_filetype(
format
)
for format in DEEZER_FORMATS
}
token = track_info["TRACK_TOKEN"]
url = self.client.get_track_url(token, "FLAC")
url = self.client.get_track_url(token, format_str)
if url is None:
md5 = track_info["MD5_ORIGIN"]
media_version = track_info["MEDIA_VERSION"]
format_number = 1
url_bytes = b"\xa4".join(
[
md5.encode(),
str(format_number).encode(),
str(meta_id).encode(),
str(media_version).encode(),
]
url = self._get_encrypted_file_url(
meta_id, track_info["MD5_ORIGIN"], track_info["MEDIA_VERSION"]
)
md5val = hashlib.md5(url_bytes).hexdigest()
step2 = (
md5val.encode()
+ b"\xa4"
+ url_bytes
+ b"\xa4"
+ (b"." * (16 - (len(step2) % 16)))
dl_info["url"] = url
logger.debug(pformat(dl_info))
return dl_info
def _get_encrypted_file_url(
self, meta_id: str, track_hash: str, media_version: str
):
format_number = 1
url_bytes = b"\xa4".join(
(
track_hash.encode(),
str(format_number).encode(),
str(meta_id).encode(),
str(media_version).encode(),
)
urlPart = _ecbCrypt("jo6aey6haid2Teih", step2)
return urlPart.decode("utf-8")
)
url_hash = hashlib.md5(url_bytes).hexdigest()
info_bytes = bytearray(url_hash.encode())
info_bytes.extend(b"\xa4")
info_bytes.extend(url_bytes)
info_bytes.extend(b"\xa4")
# Pad the bytes so that len(info_bytes) % 16 == 0
padding_len = 16 - (len(info_bytes) % 16)
info_bytes.extend(b"." * padding_len)
logger.debug("Info bytes: %s", info_bytes)
path = self._gen_url_path(info_bytes)
logger.debug(path)
return f"https://e-cdns-proxy-{track_hash[0]}.dzcdn.net/mobile/1/{path}"
def _gen_url_path(self, data):
return binascii.hexlify(
AES.new("jo6aey6haid2Teih".encode(), AES.MODE_ECB).encrypt(data)
).decode("utf-8")
def _decrypt_stream(self, url: str, meta_id: str, output_stream):
headers = {"User-Agent": AGENT}
chunk_len = 0
# isCryptedStream = (
# "/mobile/" in track.downloadURL or "/media/" in track.downloadURL
# )
# itemData = {
# "id": track.id,
# "title": track.title,
# "artist": track.mainArtist.name,
# }
with requests.get(url, headers=headers, stream=True, timeout=10) as request:
request.raise_for_status()
blowfish_key = generate_blowfish_key(meta_id)
file_size = int(request.headers["Content-Length"])
if file_size == 0:
raise Exception
for chunk in request.iter_content(2048 * 3):
if len(chunk) >= 2048:
chunk = decrypt_chunk(blowfish_key, chunk[0:2048]) + chunk[2048:]
output_stream.write(chunk)
chunk_len += len(chunk)
# except (SSLError, u3SSLError):
# streamTrack(outputStream, track, chunkLength, downloadObject, listener)
# except (RequestsConnectionError, ReadTimeout, ChunkedEncodingError):
# sleep(2)
# streamTrack(outputStream, track, start, downloadObject, listener)
@staticmethod
def _quality_id_from_filetype(filetype: str) -> Optional[int]:
return {
"MP3_128": 0,
"MP3_256": 0,
"MP3_320": 1,
"FLAC": 2,
}.get(filetype)
def generate_blowfish_key(trackId: str):
SECRET = "g4el58wc0zvf9na1"
md5_hash = hashlib.md5(trackId.encode()).hexdigest()
key = "".join(
chr(ord(md5_hash[i]) ^ ord(md5_hash[i + 16]) ^ ord(SECRET[i]))
for i in range(16)
)
return key.encode()
def decrypt_chunk(key, data):
return Blowfish.new(
key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07"
).decrypt(data)
class TidalClient(Client):