From 69e8a5d05bd4f36483dc19b3798e4e070a63288c Mon Sep 17 00:00:00 2001 From: nathom Date: Sat, 17 Apr 2021 14:18:42 -0700 Subject: [PATCH] Add documentation to clients.py and bases.py --- streamrip/bases.py | 123 +++++++++++++++++++++++++-- streamrip/clients.py | 195 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 303 insertions(+), 15 deletions(-) diff --git a/streamrip/bases.py b/streamrip/bases.py index ab9eb81..2aa710e 100644 --- a/streamrip/bases.py +++ b/streamrip/bases.py @@ -128,6 +128,12 @@ class Track: self.cover_url = None def _prepare_download(self, **kwargs): + """This function does preprocessing to prepare for downloading tracks. + It creates the directories, downloads cover art, and (optionally) + downloads booklets. + + :param kwargs: + """ # args override attributes self.quality = min(kwargs["quality"], self.client.max_quality) self.folder = kwargs["parent_folder"] or self.folder @@ -166,8 +172,7 @@ class Track: progress_bar: bool = True, **kwargs, ) -> bool: - """ - Download the track. + """Download the track. :param quality: (0, 1, 2, 3, 4) :type quality: int @@ -252,16 +257,38 @@ class Track: return True def __validate_qobuz_dl_info(self, info: dict) -> bool: + """Check if the download info dict returned by Qobuz is downloadable. + + :param info: + :type info: dict + :rtype: bool + """ return all( (info.get("sampling_rate"), info.get("bit_depth"), not info.get("sample")) ) def move(self, path: str): + """Moves the Track and sets self.path to the new path. + + :param path: + :type path: str + """ os.makedirs(os.path.dirname(path), exist_ok=True) shutil.move(self.path, path) self.path = path def _soundcloud_download(self, dl_info: dict) -> str: + """Downloads a soundcloud track. This requires a seperate function + because there are three methods that can be used to download a track: + * original file downloads + * direct mp3 downloads + * hls stream ripping + All three of these need special processing. + + :param dl_info: + :type dl_info: dict + :rtype: str + """ if dl_info["type"] == "mp3": self.path += ".mp3" # convert hls stream to mp3 @@ -290,7 +317,11 @@ class Track: self.quality = 2 @property - def _progress_desc(self): + def _progress_desc(self) -> str: + """The description that is used on the progress bar. + + :rtype: str + """ return click.style(f"Track {int(self.meta.tracknumber):02}", fg="blue") def download_cover(self): @@ -345,6 +376,14 @@ class Track: @classmethod def from_api(cls, item: dict, client: Client): + """Given a track dict from an API, return a new Track object + initialized with the proper values. + + :param item: + :type item: dict + :param client: + :type client: Client + """ meta = TrackMetadata(track=item, source=client.source) try: if client.source == "qobuz": @@ -510,6 +549,10 @@ class Track: @property def title(self) -> str: + """The title of the track. + + :rtype: str + """ if hasattr(self, "meta"): _title = self.meta.title if self.meta.explicit: @@ -577,12 +620,16 @@ class Video: self.tracknumber = kwargs.get("tracknumber", None) def load_meta(self): + """Given an id at contruction, get the metadata of the video.""" resp = self.client.get(self.id, "video") self.title = resp["title"] self.explicit = resp["explicit"] - print(resp) def download(self, **kwargs): + """Download the Video. + + :param kwargs: + """ click.secho( f"Downloading {self.title} (Video). This may take a while.", fg="blue" ) @@ -598,6 +645,14 @@ class Video: @classmethod def from_album_meta(cls, track: dict, client: Client): + """Given an video response dict from an album, return a new + Video object from the information. + + :param track: + :type track: dict + :param client: + :type client: Client + """ return cls( client, id=track["id"], @@ -608,6 +663,10 @@ class Video: @property def path(self) -> str: + """The path to download the mp4 file. + + :rtype: str + """ os.makedirs(self.parent_folder, exist_ok=True) fname = self.title if self.explicit: @@ -640,7 +699,13 @@ class Booklet: """ self.__dict__.update(resp) - def download(self, parent_folder, **kwargs): + def download(self, parent_folder: str, **kwargs): + """Download the Booklet. + + :param parent_folder: + :type parent_folder: str + :param kwargs: + """ filepath = os.path.join(parent_folder, f"{self.description}.pdf") tqdm_download(self.url, filepath) @@ -658,6 +723,11 @@ class Tracklist(list): essence_regex = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*") def download(self, **kwargs): + """Uses the _prepare_download and _download_item methods to download + all of the tracks contained in the Tracklist. + + :param kwargs: + """ self._prepare_download(**kwargs) if kwargs.get("conversion", False): has_conversion = kwargs["conversion"]["enabled"] @@ -690,16 +760,37 @@ class Tracklist(list): self.downloaded = True def _download_and_convert_item(self, item, **kwargs): + """Downloads and converts an item. + + :param item: + :param kwargs: should contain a `conversion` dict. + """ if self._download_item(item, **kwargs): item.convert(**kwargs["conversion"]) def _download_item(item, **kwargs): + """Abstract method. + + :param item: + :param kwargs: + """ raise NotImplementedError def _prepare_download(**kwargs): + """Abstract method. + + :param kwargs: + """ raise NotImplementedError def get(self, key: Union[str, int], default=None): + """A safe `get` method similar to `dict.get`. + + :param key: If it is a str, get an attribute. If an int, get the item + at the index. + :type key: Union[str, int] + :param default: + """ if isinstance(key, str): if hasattr(self, key): return getattr(self, key) @@ -713,9 +804,20 @@ class Tracklist(list): return default def set(self, key, val): + """For consistency with `Tracklist.get`. + + :param key: + :param val: + """ self.__setitem__(key, val) def convert(self, codec="ALAC", **kwargs): + """Converts every item in `self`. + Deprecated. Use _download_and_convert_item instead. + + :param codec: + :param kwargs: + """ if (sr := kwargs.get("sampling_rate")) : if sr < 44100: logger.warning( @@ -791,7 +893,11 @@ class Tracklist(list): raise InvalidQuality(f"Quality {quality} not allowed") - def download_message(self): + def download_message(self) -> str: + """The message to display after calling `Tracklist.download`. + + :rtype: str + """ click.secho( f"\n\nDownloading {self.title} ({self.__class__.__name__})\n", fg="blue", @@ -799,6 +905,11 @@ class Tracklist(list): @staticmethod def _parse_get_resp(item, client): + """Abstract. + + :param item: + :param client: + """ raise NotImplementedError @staticmethod diff --git a/streamrip/clients.py b/streamrip/clients.py index dd2056a..7c275c9 100644 --- a/streamrip/clients.py +++ b/streamrip/clients.py @@ -184,9 +184,16 @@ class QobuzClient(Client): # ---------- Private Methods --------------- - # Credit to Sorrow446 for the original methods - def _gen_pages(self, epoint: str, params: dict) -> dict: + """When there are multiple pages of results, this lazily + yields them. + + :param epoint: + :type epoint: str + :param params: + :type params: dict + :rtype: dict + """ page, status_code = self._api_request(epoint, params) logger.debug("Keys returned from _gen_pages: %s", ", ".join(page.keys())) key = epoint.split("/")[0] + "s" @@ -208,6 +215,7 @@ class QobuzClient(Client): yield page def _validate_secrets(self): + """Checks if the secrets are usable.""" for secret in self.secrets: if self._test_secret(secret): self.sec = secret @@ -217,6 +225,14 @@ class QobuzClient(Client): raise InvalidAppSecretError(f"Invalid secrets: {self.secrets}") def _api_get(self, media_type: str, **kwargs) -> dict: + """Internal function that sends the request for metadata to the + Qobuz API. + + :param media_type: + :type media_type: str + :param kwargs: + :rtype: dict + """ item_id = kwargs.get("item_id") params = { @@ -242,7 +258,17 @@ class QobuzClient(Client): return response - def _api_search(self, query, media_type, limit=500) -> Generator: + def _api_search(self, query: str, media_type: str, limit: int = 500) -> Generator: + """Internal function that sends a search request to the API. + + :param query: + :type query: str + :param media_type: + :type media_type: str + :param limit: + :type limit: int + :rtype: Generator + """ params = { "query": query, "limit": limit, @@ -268,7 +294,14 @@ class QobuzClient(Client): return self._gen_pages(epoint, params) def _api_login(self, email: str, pwd: str): - # usr_info = self._api_call("user/login", email=email, pwd=pwd) + """Internal function that logs into the api to get the user + authentication token. + + :param email: + :type email: str + :param pwd: + :type pwd: str + """ params = { "email": email, "password": pwd, @@ -294,6 +327,17 @@ class QobuzClient(Client): def _api_get_file_url( self, track_id: Union[str, int], quality: int = 3, sec: str = None ) -> dict: + """Internal function that gets the file url given an id. + + :param track_id: + :type track_id: Union[str, int] + :param quality: + :type quality: int + :param sec: only used to check whether a specific secret is valid. + If it is not provided, it is set to `self.sec`. + :type sec: str + :rtype: dict + """ unix_ts = time.time() if int(quality) not in AVAILABLE_QUALITY_IDS: # Needed? @@ -328,6 +372,14 @@ class QobuzClient(Client): return response def _api_request(self, epoint: str, params: dict) -> Tuple[dict, int]: + """The function that handles all requests to the API. + + :param epoint: + :type epoint: str + :param params: + :type params: dict + :rtype: Tuple[dict, int] + """ logging.debug(f"Calling API with endpoint {epoint} params {params}") r = self.session.get(f"{QOBUZ_BASE}/{epoint}", params=params) try: @@ -337,6 +389,12 @@ class QobuzClient(Client): raise def _test_secret(self, secret: str) -> bool: + """Tests a secret. + + :param secret: + :type secret: str + :rtype: bool + """ try: self._api_get_file_url("19512574", sec=secret) return True @@ -435,6 +493,17 @@ class TidalClient(Client): token_expiry=None, refresh_token=None, ): + """Login to Tidal using the browser. + + Providing information from previous logins will allow a user + to stay logged in. + + :param user_id: + :param country_code: + :param access_token: + :param token_expiry: + :param refresh_token: + """ if access_token is not None: self.token_expiry = token_expiry self.refresh_token = refresh_token @@ -453,9 +522,24 @@ class TidalClient(Client): click.secho("Logged into Tidal", fg="green") def get(self, item_id, media_type): + """Public method that internally calls _api_get. + + :param item_id: + :param media_type: + """ return self._api_get(item_id, media_type) - def search(self, query, media_type="album", limit: int = 100): + def search(self, query: str, media_type: str = "album", limit: int = 100) -> dict: + """Search for a query. + + :param query: + :type query: str + :param media_type: track, album, playlist, or video. + :type media_type: str + :param limit: max is 100 + :type limit: int + :rtype: dict + """ params = { "query": query, "limit": limit, @@ -463,6 +547,13 @@ class TidalClient(Client): return self._api_request(f"search/{media_type}s", params=params) def get_file_url(self, track_id, quality: int = 3, video=False): + """Get the file url for a track or video given an id. + + :param track_id: or video id + :param quality: 0, 1, 2, or 3. It is irrelevant for videos. + :type quality: int + :param video: + """ if video: return self._get_video_stream_url(track_id) @@ -484,7 +575,11 @@ class TidalClient(Client): "codec": manifest["codecs"], } - def get_tokens(self): + def get_tokens(self) -> dict: + """Used for saving them for later use. + + :rtype: dict + """ return { k: getattr(self, k) for k in ( @@ -499,6 +594,10 @@ class TidalClient(Client): # ------------ Utilities to login ------------- def _login_new_user(self, launch=True): + """This will launch the browser and ask the user to log into tidal. + + :param launch: + """ login_link = f"https://{self._get_device_code()}" click.secho( @@ -526,6 +625,7 @@ class TidalClient(Client): raise Exception def _get_device_code(self): + """Get the device code that will be used to log in on the browser.""" data = { "client_id": TIDAL_CLIENT_INFO["id"], "scope": "r_usr+w_usr+w_sub", @@ -542,6 +642,7 @@ class TidalClient(Client): return resp["verificationUriComplete"] def _check_auth_status(self): + """Check if the user has logged in inside the browser.""" data = { "client_id": TIDAL_CLIENT_INFO["id"], "device_code": self.device_code, @@ -569,7 +670,12 @@ class TidalClient(Client): self.token_expiry = resp["expires_in"] + time.time() return 0 - def _verify_access_token(self, token): + def _verify_access_token(self, token: str): + """Verify that the access token is valid. + + :param token: + :type token: str + """ headers = { "authorization": f"Bearer {token}", } @@ -582,6 +688,9 @@ class TidalClient(Client): return True def _refresh_access_token(self): + """The access token expires in a week, so it must be refreshed. + Requires a refresh token. + """ data = { "client_id": TIDAL_CLIENT_INFO["id"], "refresh_token": self.refresh_token, @@ -604,6 +713,11 @@ class TidalClient(Client): self._update_authorization() def _login_by_access_token(self, token, user_id=None): + """This is the method used to login after the access token has been saved. + + :param token: + :param user_id: Not necessary. + """ headers = {"authorization": f"Bearer {token}"} # temporary resp = self.session.get( "https://api.tidal.com/v1/sessions", headers=headers @@ -620,15 +734,25 @@ class TidalClient(Client): self._update_authorization() def _update_authorization(self): + """Update the requests session headers with the auth token.""" self.session.headers.update(self.authorization) @property def authorization(self): + """The auth header.""" return {"authorization": f"Bearer {self.access_token}"} # ------------- Fetch data ------------------ def _api_get(self, item_id: str, media_type: str) -> dict: + """Send a request to the api for information. + + :param item_id: + :type item_id: str + :param media_type: track, album, playlist, or video. + :type media_type: str + :rtype: dict + """ url = f"{media_type}s/{item_id}" item = self._api_request(url) if media_type in ("playlist", "album"): @@ -650,7 +774,14 @@ class TidalClient(Client): return item - def _api_request(self, path, params=None) -> dict: + def _api_request(self, path: str, params=None) -> dict: + """The function that handles all tidal API requests. + + :param path: + :type path: str + :param params: + :rtype: dict + """ if params is None: params = {} @@ -659,7 +790,14 @@ class TidalClient(Client): r = self.session.get(f"{TIDAL_BASE}/{path}", params=params).json() return r - def _get_video_stream_url(self, video_id) -> str: + def _get_video_stream_url(self, video_id: str) -> str: + """Videos have to be ripped from an hls stream, so they require + seperate processing. + + :param video_id: + :type video_id: str + :rtype: str + """ params = { "videoquality": "HIGH", "playbackmode": "STREAM", @@ -680,6 +818,12 @@ class TidalClient(Client): return url_info[-1] def _api_post(self, url, data, auth=None): + """Function used for posting to tidal API. + + :param url: + :param data: + :param auth: + """ r = self.session.post(url, data=data, auth=auth, verify=False).json() return r @@ -693,9 +837,15 @@ class SoundCloudClient(Client): self.session = gen_threadsafe_session(headers={"User-Agent": AGENT}) def login(self): + """Login is not necessary for SoundCloud.""" raise NotImplementedError def get(self, id, media_type="track"): + """Get metadata for a media type given an id. + + :param id: + :param media_type: + """ assert media_type in ("track", "playlist"), f"{media_type} not supported" if "http" in str(id): @@ -708,6 +858,16 @@ class SoundCloudClient(Client): return resp def get_file_url(self, track: dict, quality) -> dict: + """Get the streamable file url from soundcloud. + + It will most likely be an hls stream, which will have to be manually + parsed, or downloaded with ffmpeg. + + :param track: + :type track: dict + :param quality: + :rtype: dict + """ if not track["streamable"] or track["policy"] == "BLOCK": raise Exception @@ -729,11 +889,28 @@ class SoundCloudClient(Client): return {"url": resp["url"], "type": "mp3"} def search(self, query: str, media_type="album"): + """Search for a query. + + :param query: + :type query: str + :param media_type: Can be album, though it will return a playlist + response. + """ params = {"q": query} resp, _ = self._get(f"search/{media_type}s", params=params) return resp def _get(self, path, params=None, no_base=False, resp_obj=False): + """The lower level of `SoundCloudClient.get` that handles request + parameters and other options. + + :param path: + :param params: + :param no_base: Do not append `path` parameter to the SoundCloud API + base. + :param resp_obj: Return the object returned by `requests.get` instead + of the json response dict. + """ if params is None: params = {} params["client_id"] = SOUNDCLOUD_CLIENT_ID