import base64 import hashlib import json import logging import re import time from abc import ABC, abstractmethod from typing import Generator, Sequence, Tuple, Union import click from .constants import ( AGENT, AVAILABLE_QUALITY_IDS, DEEZER_BASE, DEEZER_DL, DEEZER_MAX_Q, QOBUZ_BASE, QOBUZ_FEATURED_KEYS, SOUNDCLOUD_BASE, SOUNDCLOUD_CLIENT_ID, TIDAL_AUTH_URL, TIDAL_BASE, TIDAL_CLIENT_INFO, TIDAL_MAX_Q, ) from .exceptions import ( AuthenticationError, IneligibleError, InvalidAppIdError, InvalidAppSecretError, InvalidQuality, ) from .spoofbuz import Spoofer from .utils import gen_threadsafe_session, get_quality logger = logging.getLogger(__name__) class Client(ABC): """Common API for clients of all platforms. This is an Abstract Base Class. It cannot be instantiated; it is merely a template. """ @abstractmethod def login(self, **kwargs): """Authenticate the client. :param kwargs: """ pass @abstractmethod def search(self, query: str, media_type="album"): """Search API for query. :param query: :type query: str :param type_: """ pass @abstractmethod def get(self, item_id, media_type="album"): """Get metadata. :param meta_id: :param type_: """ pass @abstractmethod def get_file_url(self, track_id, quality=3) -> Union[dict]: """Get the direct download url dict for a file. :param track_id: id of the track """ pass @property @abstractmethod def source(self): """Source from which the Client retrieves data.""" pass @property @abstractmethod def max_quality(self): """The maximum quality that the Client supports.""" pass class QobuzClient(Client): source = "qobuz" max_quality = 4 # ------- Public Methods ------------- def __init__(self): self.logged_in = False def login(self, email: str, pwd: str, **kwargs): """Authenticate the QobuzClient. Must have a paid membership. If `app_id` and `secrets` are not provided, this will run the Spoofer script, which retrieves them. This will take some time, so it is recommended to cache them somewhere for reuse. :param email: email for the qobuz account :type email: str :param pwd: password for the qobuz account :type pwd: str :param kwargs: app_id: str, secrets: list, return_secrets: bool """ click.secho(f"Logging into {self.source}", fg="green") if self.logged_in: logger.debug("Already logged in") return if (kwargs.get("app_id") or kwargs.get("secrets")) in (None, [], ""): click.secho("Fetching tokens, this may take a few seconds.") logger.info("Fetching tokens from Qobuz") spoofer = Spoofer() kwargs["app_id"] = spoofer.get_app_id() kwargs["secrets"] = spoofer.get_secrets() self.app_id = str(kwargs["app_id"]) # Ensure it is a string self.secrets = kwargs["secrets"] self.session = gen_threadsafe_session( headers={"User-Agent": AGENT, "X-App-Id": self.app_id} ) self._api_login(email, pwd) logger.debug("Logged into Qobuz") self._validate_secrets() logger.debug("Qobuz client is ready to use") self.logged_in = True def get_tokens(self) -> Tuple[str, Sequence[str]]: return self.app_id, self.secrets def search( self, query: str, media_type: str = "album", limit: int = 500 ) -> Generator: """Search the qobuz API. If 'featured' is given as media type, this will retrieve results from the featured albums in qobuz. The queries available with this type are: * most-streamed * recent-releases * best-sellers * press-awards * ideal-discography * editor-picks * most-featured * qobuzissims * new-releases * new-releases-full * harmonia-mundi * universal-classic * universal-jazz * universal-jeunesse * universal-chanson :param query: :type query: str :param media_type: :type media_type: str :param limit: :type limit: int :rtype: Generator """ return self._api_search(query, media_type, limit) def get(self, item_id: Union[str, int], media_type: str = "album") -> dict: resp = self._api_get(media_type, item_id=item_id) logger.debug(resp) return resp def get_file_url(self, item_id, quality=3) -> dict: return self._api_get_file_url(item_id, quality=quality) # ---------- Private Methods --------------- def _gen_pages(self, epoint: str, params: dict) -> dict: """When there are multiple pages of results, this lazily yields them. :param epoint: :type epoint: str :param params: :type params: dict :rtype: dict """ page, status_code = self._api_request(epoint, params) logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys())) key = epoint.split("/")[0] + "s" total = page.get(key, {}) total = total.get("total") or total.get("items") if not total: logger.debug("Nothing found from %s epoint", epoint) return limit = page.get(key, {}).get("limit", 500) offset = page.get(key, {}).get("offset", 0) params.update({"limit": limit}) yield page while (offset + limit) < total: offset += limit params.update({"offset": offset}) page, status_code = self._api_request(epoint, params) yield page def _validate_secrets(self): """Checks if the secrets are usable.""" for secret in self.secrets: if self._test_secret(secret): self.sec = secret logger.debug("Working secret and app_id: %s - %s", secret, self.app_id) break if not hasattr(self, "sec"): raise InvalidAppSecretError(f"Invalid secrets: {self.secrets}") def _api_get(self, media_type: str, **kwargs) -> dict: """Internal function that sends the request for metadata to the Qobuz API. :param media_type: :type media_type: str :param kwargs: :rtype: dict """ item_id = kwargs.get("item_id") params = { "app_id": self.app_id, f"{media_type}_id": item_id, "limit": kwargs.get("limit", 500), "offset": kwargs.get("offset", 0), } extras = { "artist": "albums", "playlist": "tracks", "label": "albums", # not tested } if media_type in extras: params.update({"extra": extras[media_type]}) epoint = f"{media_type}/get" response, status_code = self._api_request(epoint, params) if status_code != 200: raise Exception(f'Error fetching metadata. "{response["message"]}"') return response def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator: """Internal function that sends a search request to the API. :param query: :type query: str :param media_type: :type media_type: str :param limit: :type limit: int :rtype: Generator """ params = { "query": query, "limit": limit, } # TODO: move featured, favorites, and playlists into _api_get later if media_type == "featured": assert query in QOBUZ_FEATURED_KEYS, f'query "{query}" is invalid.' params.update({"type": query}) del params["query"] epoint = "album/getFeatured" elif query == "user-favorites": assert query in ("track", "artist", "album") params.update({"type": f"{media_type}s"}) epoint = "favorite/getUserFavorites" elif query == "user-playlists": epoint = "playlist/getUserPlaylists" else: epoint = f"{media_type}/search" return self._gen_pages(epoint, params) def _api_login(self, email: str, pwd: str): """Internal function that logs into the api to get the user authentication token. :param email: :type email: str :param pwd: :type pwd: str """ params = { "email": email, "password": pwd, "app_id": self.app_id, } epoint = "user/login" resp, status_code = self._api_request(epoint, params) if status_code == 401: raise AuthenticationError(f"Invalid credentials from params {params}") elif status_code == 400: raise InvalidAppIdError(f"Invalid app id from params {params}") else: logger.info("Logged in to Qobuz") if not resp["user"]["credential"]["parameters"]: raise IneligibleError("Free accounts are not eligible to download tracks.") self.uat = resp["user_auth_token"] self.session.headers.update({"X-User-Auth-Token": self.uat}) self.label = resp["user"]["credential"]["parameters"]["short_label"] def _api_get_file_url( self, track_id: Union[str, int], quality: int = 3, sec: str = None ) -> dict: """Internal function that gets the file url given an id. :param track_id: :type track_id: Union[str, int] :param quality: :type quality: int :param sec: only used to check whether a specific secret is valid. If it is not provided, it is set to `self.sec`. :type sec: str :rtype: dict """ unix_ts = time.time() if int(quality) not in AVAILABLE_QUALITY_IDS: # Needed? raise InvalidQuality( f"Invalid quality id {quality}. Choose from {AVAILABLE_QUALITY_IDS}" ) if sec is not None: secret = sec elif hasattr(self, "sec"): secret = self.sec else: raise InvalidAppSecretError("Cannot find app secret") quality = get_quality(quality, self.source) r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}" logger.debug("Raw request signature: %s", r_sig) r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest() logger.debug("Hashed request signature: %s", r_sig_hashed) params = { "request_ts": unix_ts, "request_sig": r_sig_hashed, "track_id": track_id, "format_id": quality, "intent": "stream", } response, status_code = self._api_request("track/getFileUrl", params) if status_code == 400: raise InvalidAppSecretError("Invalid app secret from params %s" % params) return response def _api_request(self, epoint: str, params: dict) -> Tuple[dict, int]: """The function that handles all requests to the API. :param epoint: :type epoint: str :param params: :type params: dict :rtype: Tuple[dict, int] """ logging.debug(f"Calling API with endpoint {epoint} params {params}") r = self.session.get(f"{QOBUZ_BASE}/{epoint}", params=params) try: return r.json(), r.status_code except Exception: logger.error("Problem getting JSON. Status code: %s", r.status_code) raise def _test_secret(self, secret: str) -> bool: """Tests a secret. :param secret: :type secret: str :rtype: bool """ try: self._api_get_file_url("19512574", sec=secret) return True except InvalidAppSecretError as error: logger.debug("Test for %s secret didn't work: %s", secret, error) return False class DeezerClient(Client): source = "deezer" max_quality = 2 def __init__(self): self.session = gen_threadsafe_session() # no login required self.logged_in = True def search(self, query: str, media_type: str = "album", limit: int = 200) -> dict: """Search API for query. :param query: :type query: str :param media_type: :type media_type: str :param limit: :type limit: int """ # TODO: more robust url sanitize query = query.replace(" ", "+") # TODO: use limit parameter response = self.session.get(f"{DEEZER_BASE}/search/{media_type}?q={query}") response.raise_for_status() return response.json() def login(self, **kwargs): logger.debug("Deezer does not require login call, returning") def get(self, meta_id: Union[str, int], media_type: str = "album"): """Get metadata. :param meta_id: :type meta_id: Union[str, int] :param type_: :type type_: str """ url = f"{DEEZER_BASE}/{media_type}/{meta_id}" item = self.session.get(url).json() if media_type in ("album", "playlist"): tracks = self.session.get(f"{url}/tracks").json() item["tracks"] = tracks["data"] item["track_total"] = len(tracks["data"]) elif media_type == "artist": albums = self.session.get(f"{url}/albums").json() item["albums"] = albums["data"] logger.debug(item) return item @staticmethod def get_file_url(meta_id: Union[str, int], quality: int = 6): quality = min(DEEZER_MAX_Q, quality) url = f"{DEEZER_DL}/{get_quality(quality, 'deezer')}/{DEEZER_BASE}/track/{meta_id}" logger.debug(f"Download url {url}") return {"url": url} class TidalClient(Client): source = "tidal" max_quality = 3 # ----------- Public Methods -------------- def __init__(self): self.logged_in = False self.device_code = None self.user_code = None self.verification_url = None self.auth_check_timeout = None self.auth_check_interval = None self.user_id = None self.country_code = None self.access_token = None self.refresh_token = None self.expiry = None self.session = gen_threadsafe_session() def login( self, user_id=None, country_code=None, access_token=None, token_expiry=None, refresh_token=None, ): """Login to Tidal using the browser. Providing information from previous logins will allow a user to stay logged in. :param user_id: :param country_code: :param access_token: :param token_expiry: :param refresh_token: """ if access_token is not None: self.token_expiry = token_expiry self.refresh_token = refresh_token if self.token_expiry - time.time() < 86400: # 1 day logger.debug("Refreshing access token") self._refresh_access_token() else: logger.debug("Logging in with access token") self._login_by_access_token(access_token, user_id) else: logger.debug("Logging in as a new user") self._login_new_user() self.logged_in = True click.secho("Logged into Tidal", fg="green") def get(self, item_id, media_type): """Public method that internally calls _api_get. :param item_id: :param media_type: """ resp = self._api_get(item_id, media_type) logger.debug(resp) return resp def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict: """Search for a query. :param query: :type query: str :param media_type: track, album, playlist, or video. :type media_type: str :param limit: max is 100 :type limit: int :rtype: dict """ params = { "query": query, "limit": limit, } return self._api_request(f"search/{media_type}s", params=params) def get_file_url(self, track_id, quality: int = 3, video=False): """Get the file url for a track or video given an id. :param track_id: or video id :param quality: 0, 1, 2, or 3. It is irrelevant for videos. :type quality: int :param video: """ if video: return self._get_video_stream_url(track_id) params = { "audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source), "playbackmode": "STREAM", "assetpresentation": "FULL", } resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params) try: manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) except KeyError: raise Exception(resp["userMessage"]) logger.debug(manifest) return { "url": manifest["urls"][0], "enc_key": manifest.get("keyId"), "codec": manifest["codecs"], } def get_tokens(self) -> dict: """Used for saving them for later use. :rtype: dict """ return { k: getattr(self, k) for k in ( "user_id", "country_code", "access_token", "refresh_token", "token_expiry", ) } # ------------ Utilities to login ------------- def _login_new_user(self, launch=True): """This will launch the browser and ask the user to log into tidal. :param launch: """ login_link = f"https://{self._get_device_code()}" click.secho( f"Go to {login_link} to log into Tidal within 5 minutes.", fg="blue" ) if launch: click.launch(login_link) start = time.time() elapsed = 0 while elapsed < 600: # 5 mins to login elapsed = time.time() - start status = self._check_auth_status() if status == 2: # pending time.sleep(4) continue elif status == 1: # error checking raise Exception elif status == 0: # successful break else: raise Exception def _get_device_code(self): """Get the device code that will be used to log in on the browser.""" data = { "client_id": TIDAL_CLIENT_INFO["id"], "scope": "r_usr+w_usr+w_sub", } resp = self._api_post(f"{TIDAL_AUTH_URL}/device_authorization", data) if "status" in resp and resp["status"] != 200: raise Exception(f"Device authorization failed {resp}") self.device_code = resp["deviceCode"] self.user_code = resp["userCode"] self.user_code_expiry = resp["expiresIn"] self.auth_interval = resp["interval"] return resp["verificationUriComplete"] def _check_auth_status(self): """Check if the user has logged in inside the browser.""" data = { "client_id": TIDAL_CLIENT_INFO["id"], "device_code": self.device_code, "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "scope": "r_usr+w_usr+w_sub", } logger.debug(data) resp = self._api_post( f"{TIDAL_AUTH_URL}/token", data, (TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]), ) logger.debug(resp) if resp.get("status", 200) != 200: if resp["status"] == 400 and resp["sub_status"] == 1002: return 2 else: return 1 self.user_id = resp["user"]["userId"] self.country_code = resp["user"]["countryCode"] self.access_token = resp["access_token"] self.refresh_token = resp["refresh_token"] self.token_expiry = resp["expires_in"] + time.time() return 0 def _verify_access_token(self, token: str): """Verify that the access token is valid. :param token: :type token: str """ headers = { "authorization": f"Bearer {token}", } r = self.session.get( "https://api.tidal.com/v1/sessions", headers=headers ).json() if r.status != 200: raise Exception("Login failed") return True def _refresh_access_token(self): """The access token expires in a week, so it must be refreshed. Requires a refresh token. """ data = { "client_id": TIDAL_CLIENT_INFO["id"], "refresh_token": self.refresh_token, "grant_type": "refresh_token", "scope": "r_usr+w_usr+w_sub", } resp = self._api_post( f"{TIDAL_AUTH_URL}/token", data, (TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]), ) if resp.get("status", 200) != 200: raise Exception("Refresh failed") self.user_id = resp["user"]["userId"] self.country_code = resp["user"]["countryCode"] self.access_token = resp["access_token"] self.token_expiry = resp["expires_in"] + time.time() self._update_authorization() def _login_by_access_token(self, token, user_id=None): """This is the method used to login after the access token has been saved. :param token: :param user_id: Not necessary. """ headers = {"authorization": f"Bearer {token}"} # temporary resp = self.session.get( "https://api.tidal.com/v1/sessions", headers=headers ).json() if resp.get("status", 200) != 200: raise Exception(f"Login failed {resp}") if str(resp.get("userId")) != str(user_id): raise Exception(f"User id mismatch {resp['userId']} v {user_id}") self.user_id = resp["userId"] self.country_code = resp["countryCode"] self.access_token = token self._update_authorization() def _update_authorization(self): """Update the requests session headers with the auth token.""" self.session.headers.update(self.authorization) @property def authorization(self): """The auth header.""" return {"authorization": f"Bearer {self.access_token}"} # ------------- Fetch data ------------------ def _api_get(self, item_id: str, media_type: str) -> dict: """Send a request to the api for information. :param item_id: :type item_id: str :param media_type: track, album, playlist, or video. :type media_type: str :rtype: dict """ url = f"{media_type}s/{item_id}" item = self._api_request(url) if media_type in ("playlist", "album"): resp = self._api_request(f"{url}/items") if (tracks_left := item["numberOfTracks"]) > 100: offset = 0 while tracks_left > 0: offset += 100 tracks_left -= 100 resp["items"].extend( self._api_request(f"{url}/items", {"offset": offset})["items"] ) item["tracks"] = [item["item"] for item in resp["items"]] elif media_type == "artist": resp = self._api_request(f"{url}/albums") item["albums"] = resp["items"] return item def _api_request(self, path: str, params=None) -> dict: """The function that handles all tidal API requests. :param path: :type path: str :param params: :rtype: dict """ if params is None: params = {} params["countryCode"] = self.country_code params["limit"] = 100 r = self.session.get(f"{TIDAL_BASE}/{path}", params=params).json() return r def _get_video_stream_url(self, video_id: str) -> str: """Videos have to be ripped from an hls stream, so they require seperate processing. :param video_id: :type video_id: str :rtype: str """ params = { "videoquality": "HIGH", "playbackmode": "STREAM", "assetpresentation": "FULL", } resp = self._api_request( f"videos/{video_id}/playbackinfopostpaywall", params=params ) stream_url_regex = ( r'#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS="[^"]+"' r",RESOLUTION=\d+x\d+\n(.+)" ) manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) available_urls = self.session.get(manifest["urls"][0]) url_info = re.findall(stream_url_regex, available_urls.text) # highest resolution is last return url_info[-1] def _api_post(self, url, data, auth=None): """Function used for posting to tidal API. :param url: :param data: :param auth: """ r = self.session.post(url, data=data, auth=auth, verify=False).json() return r class SoundCloudClient(Client): source = "soundcloud" max_quality = 0 logged_in = True def __init__(self): self.session = gen_threadsafe_session(headers={"User-Agent": AGENT}) def login(self): """Login is not necessary for SoundCloud.""" raise NotImplementedError def get(self, id, media_type="track"): """Get metadata for a media type given an id. :param id: :param media_type: """ assert media_type in ("track", "playlist"), f"{media_type} not supported" if "http" in str(id): resp, _ = self._get(f"resolve?url={id}") elif media_type == "track": resp, _ = self._get(f"{media_type}s/{id}") else: raise Exception(id) logger.debug(resp) return resp def get_file_url(self, track: dict, quality) -> dict: """Get the streamable file url from soundcloud. It will most likely be an hls stream, which will have to be manually parsed, or downloaded with ffmpeg. :param track: :type track: dict :param quality: :rtype: dict """ if not track["streamable"] or track["policy"] == "BLOCK": raise Exception if track["downloadable"] and track["has_downloads_left"]: r = self._get(f"tracks/{track['id']}/download", resp_obj=True) return {"url": r.json()["redirectUri"], "type": "original"} else: url = None for tc in track["media"]["transcodings"]: fmt = tc["format"] if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": url = tc["url"] break assert url is not None resp, _ = self._get(url, no_base=True) return {"url": resp["url"], "type": "mp3"} def search(self, query: str, media_type="album"): """Search for a query. :param query: :type query: str :param media_type: Can be album, though it will return a playlist response. """ params = {"q": query} resp, _ = self._get(f"search/{media_type}s", params=params) return resp def _get(self, path, params=None, no_base=False, resp_obj=False): """The lower level of `SoundCloudClient.get` that handles request parameters and other options. :param path: :param params: :param no_base: Do not append `path` parameter to the SoundCloud API base. :param resp_obj: Return the object returned by `requests.get` instead of the json response dict. """ if params is None: params = {} params["client_id"] = SOUNDCLOUD_CLIENT_ID if no_base: url = path else: url = f"{SOUNDCLOUD_BASE}/{path}" logger.debug(f"Fetching url {url}") r = self.session.get(url, params=params) if resp_obj: return r return r.json(), r.status_code