diff --git a/poetry.lock b/poetry.lock index 1b91cc7..5e19b04 100644 --- a/poetry.lock +++ b/poetry.lock @@ -203,7 +203,7 @@ files = [ name = "atomicwrites" version = "1.4.1" description = "Atomic file writes." -category = "dev" +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -718,7 +718,7 @@ testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs name = "iniconfig" version = "2.0.0" description = "brain-dead simple config-ini parsing" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -983,7 +983,7 @@ files = [ name = "packaging" version = "23.1" description = "Core utilities for Python packages" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1133,7 +1133,7 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4)", "pytest-co name = "pluggy" version = "1.3.0" description = "plugin and hook calling mechanisms for python" -category = "dev" +category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1149,7 +1149,7 @@ testing = ["pytest", "pytest-benchmark"] name = "py" version = "1.11.0" description = "library with cross-python path, ini-parsing, io, code, log facilities" -category = "dev" +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" files = [ @@ -1322,7 +1322,7 @@ plugins = ["importlib-metadata"] name = "pytest" version = "6.2.5" description = "pytest: simple powerful testing with Python" -category = "dev" +category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1343,6 +1343,24 @@ toml = "*" [package.extras] testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] +[[package]] +name = "pytest-mock" +version = "3.11.1" +description = "Thin-wrapper around the mock package for easier use with pytest" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pytest-mock-3.11.1.tar.gz", hash = "sha256:7f6b125602ac6d743e523ae0bfa71e1a697a2f5534064528c6ff84c2f7c2fc7f"}, + {file = "pytest_mock-3.11.1-py3-none-any.whl", hash = "sha256:21c279fff83d70763b05f8874cc9cfb3fcacd6d354247a976f9529d19f9acf39"}, +] + +[package.dependencies] +pytest = ">=5.0" + +[package.extras] +dev = ["pre-commit", "pytest-asyncio", "tox"] + [[package]] name = "pytz" version = "2023.3.post1" @@ -1670,7 +1688,7 @@ test = ["pytest"] name = "toml" version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" +category = "main" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -1959,4 +1977,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8 <4.0" -content-hash = "1cf354944aafbff9fe2ac4bd14695367cb3eb289c8177c55612bc9187dbebbab" +content-hash = "e259b509756397e94af7de03b42c5f6e1e62ad24d133daba282fbe095e244824" diff --git a/pyproject.toml b/pyproject.toml index fc7ce19..4f4ed8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,15 +7,18 @@ license = "GPL-3.0-only" readme = "README.md" homepage = "https://github.com/nathom/streamrip" repository = "https://github.com/nathom/streamrip" -include = ["streamrip/config.toml"] +include = ["src/config.toml"] keywords = ["hi-res", "free", "music", "download"] classifiers = [ "License :: OSI Approved :: GNU General Public License (GPL)", "Operating System :: OS Independent", ] +packages = [ + { include = "streamrip" } +] [tool.poetry.scripts] -rip = "src.cli:main" +rip = "streamrip.cli:main" [tool.poetry.dependencies] python = ">=3.8 <4.0" @@ -36,6 +39,7 @@ aiofiles = "^0.7" aiohttp = "^3.7" aiodns = "^3.0.0" aiolimiter = "^1.1.0" +pytest-mock = "^3.11.1" [tool.poetry.urls] "Bug Reports" = "https://github.com/nathom/streamrip/issues" diff --git a/rip/__init__.py b/rip/__init__.py deleted file mode 100644 index 6015393..0000000 --- a/rip/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Rip: an easy to use command line utility for downloading audio streams.""" diff --git a/rip/__main__.py b/rip/__main__.py deleted file mode 100644 index f6f770a..0000000 --- a/rip/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Run the rip program.""" -from .cli import main - -main() diff --git a/rip/exceptions.py b/rip/exceptions.py deleted file mode 100644 index a392215..0000000 --- a/rip/exceptions.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Exceptions used by RipCore.""" - - -class DeezloaderFallback(Exception): - """Raise if Deezer account isn't logged in and rip is falling back to Deezloader.""" diff --git a/rip/utils.py b/rip/utils.py deleted file mode 100644 index dcf1a5f..0000000 --- a/rip/utils.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Utility functions for RipCore.""" - -import re -from typing import Tuple - -from streamrip.constants import AGENT -from streamrip.utils import gen_threadsafe_session - -interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'") - - -def extract_interpreter_url(url: str) -> str: - """Extract artist ID from a Qobuz interpreter url. - - :param url: Urls of the form "https://www.qobuz.com/us-en/interpreter/{artist}/download-streaming-albums" - :type url: str - :rtype: str - """ - session = gen_threadsafe_session({"User-Agent": AGENT}) - r = session.get(url) - match = interpreter_artist_regex.search(r.text) - if match: - return match.group(1) - - raise Exception( - "Unable to extract artist id from interpreter url. Use a " - "url that contains an artist id." - ) - - -deezer_id_link_regex = re.compile( - r"https://www\.deezer\.com/[a-z]{2}/(album|artist|playlist|track)/(\d+)" -) - - -def extract_deezer_dynamic_link(url: str) -> Tuple[str, str]: - """Extract a deezer url that includes an ID from a deezer.page.link url. - - :param url: - :type url: str - :rtype: Tuple[str, str] - """ - session = gen_threadsafe_session({"User-Agent": AGENT}) - r = session.get(url) - match = deezer_id_link_regex.search(r.text) - if match: - return match.group(1), match.group(2) - - raise Exception("Unable to extract Deezer dynamic link.") diff --git a/src/media.py b/src/media.py deleted file mode 100644 index bf52fc7..0000000 --- a/src/media.py +++ /dev/null @@ -1,32 +0,0 @@ -from abc import ABC, abstractmethod - - -class Media(ABC): - async def rip(self): - await self.preprocess() - await self.download() - await self.postprocess() - - @abstractmethod - async def preprocess(self): - """Create directories, download cover art, etc.""" - raise NotImplemented - - @abstractmethod - async def download(self): - """Download and tag the actual audio files in the correct directories.""" - raise NotImplemented - - @abstractmethod - async def postprocess(self): - """Update database, run conversion, delete garbage files etc.""" - raise NotImplemented - - -class Pending(ABC): - """A request to download a `Media` whose metadata has not been fetched.""" - - @abstractmethod - async def resolve(self) -> Media: - """Fetch metadata and resolve into a downloadable `Media` object.""" - raise NotImplemented diff --git a/src/track.py b/src/track.py deleted file mode 100644 index 04f1a5f..0000000 --- a/src/track.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -from dataclasses import dataclass - -from .client import Client -from .config import Config -from .downloadable import Downloadable -from .media import Media, Pending -from .metadata import AlbumMetadata, TrackMetadata - - -@dataclass(slots=True) -class Track(Media): - meta: TrackMetadata - downloadable: Downloadable - config: Config - folder: str - download_path: str = "" - - async def preprocess(self): - folder = self._get_folder(self.folder) - os.makedirs(folder, exist_ok=True) - # Run in background while track downloads? - # Don't download again if part of album - await self._download_cover() - - async def download(self): - async with get_progress_bar(self.config, self.downloadable.size()) as bar: - self.downloadable.download(self.download_path, lambda x: bar.update(x)) - - async def postprocess(self): - await self.tag() - await self.convert() - - -@dataclass(slots=True) -class PendingTrack(Pending): - id: str - album: AlbumMetadata - client: Client - config: Config - folder: str - - async def resolve(self) -> Track: - resp = await self.client.get_metadata({"id": self.id}, "track") - meta = TrackMetadata.from_resp(self.album, self.client.source, resp) - quality = getattr(self.config.session, self.client.source).quality - assert isinstance(quality, int) - downloadable = await self.client.get_downloadable(self.id, quality) - return Track(meta, downloadable, self.config, self.directory) diff --git a/streamrip/__init__.py b/streamrip/__init__.py index f69b69e..4e1f95f 100644 --- a/streamrip/__init__.py +++ b/streamrip/__init__.py @@ -1,5 +1 @@ -"""streamrip: the all in one music downloader.""" - -__version__ = "1.9.7" - -from . import clients, constants, converter, downloadtools, media +__all__ = ["config"] diff --git a/src/album.py b/streamrip/album.py similarity index 100% rename from src/album.py rename to streamrip/album.py diff --git a/src/artist.py b/streamrip/artist.py similarity index 100% rename from src/artist.py rename to streamrip/artist.py diff --git a/src/cli.py b/streamrip/cli.py similarity index 100% rename from src/cli.py rename to streamrip/cli.py diff --git a/src/client.py b/streamrip/client.py similarity index 100% rename from src/client.py rename to streamrip/client.py diff --git a/src/config.py b/streamrip/config.py similarity index 84% rename from src/config.py rename to streamrip/config.py index 64867b6..e86fd0d 100644 --- a/src/config.py +++ b/streamrip/config.py @@ -3,7 +3,7 @@ import copy import logging import os -from dataclasses import dataclass +from dataclasses import dataclass, fields from tomlkit.api import dumps, parse from tomlkit.toml_document import TOMLDocument @@ -217,10 +217,11 @@ class ConfigData: filepaths: FilepathsConfig artwork: ArtworkConfig metadata: MetadataConfig - qobuz_filter: QobuzDiscographyFilterConfig + qobuz_filters: QobuzDiscographyFilterConfig theme: ThemeConfig database: DatabaseConfig + conversion: ConversionConfig _modified: bool = False @@ -241,9 +242,10 @@ class ConfigData: artwork = ArtworkConfig(**toml["artwork"]) # type: ignore filepaths = FilepathsConfig(**toml["filepaths"]) # type: ignore metadata = MetadataConfig(**toml["metadata"]) # type: ignore - qobuz_filter = QobuzDiscographyFilterConfig(**toml["qobuz_filters"]) # type: ignore + qobuz_filters = QobuzDiscographyFilterConfig(**toml["qobuz_filters"]) # type: ignore theme = ThemeConfig(**toml["theme"]) # type: ignore database = DatabaseConfig(**toml["database"]) # type: ignore + conversion = ConversionConfig(**toml["conversion"]) # type: ignore return cls( toml=toml, @@ -257,9 +259,10 @@ class ConfigData: artwork=artwork, filepaths=filepaths, metadata=metadata, - qobuz_filter=qobuz_filter, + qobuz_filters=qobuz_filters, theme=theme, database=database, + conversion=conversion, ) @classmethod @@ -275,7 +278,25 @@ class ConfigData: return self._modified def update_toml(self): - pass + update_toml_section_from_config(self.toml["downloads"], self.downloads) + update_toml_section_from_config(self.toml["qobuz"], self.qobuz) + update_toml_section_from_config(self.toml["tidal"], self.tidal) + update_toml_section_from_config(self.toml["deezer"], self.deezer) + update_toml_section_from_config(self.toml["soundcloud"], self.soundcloud) + update_toml_section_from_config(self.toml["youtube"], self.youtube) + update_toml_section_from_config(self.toml["lastfm"], self.lastfm) + update_toml_section_from_config(self.toml["artwork"], self.artwork) + update_toml_section_from_config(self.toml["filepaths"], self.filepaths) + update_toml_section_from_config(self.toml["metadata"], self.metadata) + update_toml_section_from_config(self.toml["qobuz_filters"], self.qobuz_filters) + update_toml_section_from_config(self.toml["theme"], self.theme) + update_toml_section_from_config(self.toml["database"], self.database) + update_toml_section_from_config(self.toml["conversion"], self.conversion) + + +def update_toml_section_from_config(toml_section, config): + for field in fields(config): + toml_section[field.name] = getattr(config, field.name) class Config: @@ -294,3 +315,6 @@ class Config: with open(self._path, "w") as toml_file: self.file.update_toml() toml_file.write(dumps(self.file.toml)) + + def __del__(self): + self.save_file() diff --git a/src/config.toml b/streamrip/config.toml similarity index 100% rename from src/config.toml rename to streamrip/config.toml diff --git a/streamrip/constants.py b/streamrip/constants.py deleted file mode 100644 index ce4eadf..0000000 --- a/streamrip/constants.py +++ /dev/null @@ -1,195 +0,0 @@ -"""Constants that are kept in one place.""" - -import base64 - -import mutagen.id3 as id3 - -AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0" - -TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" -# Get this from (base64encoded) -# aHR0cHM6Ly9hLXYyLnNuZGNkbi5jb20vYXNzZXRzLzItYWIxYjg1NjguanM= -# Don't know if this is a static url yet -SOUNDCLOUD_CLIENT_ID = "qHsjZaNbdTcABbiIQnVfW07cEPGLNjIh" -SOUNDCLOUD_USER_ID = "672320-86895-162383-801513" -SOUNDCLOUD_APP_VERSION = "1630917744" - - -QUALITY_DESC = { - 0: "128kbps", - 1: "320kbps", - 2: "16bit/44.1kHz", - 3: "24bit/96kHz", - 4: "24bit/192kHz", -} - -QOBUZ_FEATURED_KEYS = ( - "most-streamed", - "recent-releases", - "best-sellers", - "press-awards", - "ideal-discography", - "editor-picks", - "most-featured", - "qobuzissims", - "new-releases", - "new-releases-full", - "harmonia-mundi", - "universal-classic", - "universal-jazz", - "universal-jeunesse", - "universal-chanson", -) - -__MP4_KEYS = ( - "\xa9nam", - "\xa9ART", - "\xa9alb", - r"aART", - "\xa9day", - "\xa9day", - "\xa9cmt", - "desc", - "purd", - "\xa9grp", - "\xa9gen", - "\xa9lyr", - "\xa9too", - "cprt", - "cpil", - "covr", - "trkn", - "disk", - None, - None, - None, -) - -__MP3_KEYS = ( - id3.TIT2, - id3.TPE1, - id3.TALB, - id3.TPE2, - id3.TCOM, - id3.TYER, - id3.COMM, - id3.TT1, - id3.TT1, - id3.GP1, - id3.TCON, - id3.USLT, - id3.TEN, - id3.TCOP, - id3.TCMP, - None, - id3.TRCK, - id3.TPOS, - None, - None, - None, -) - -__METADATA_TYPES = ( - "title", - "artist", - "album", - "albumartist", - "composer", - "year", - "comment", - "description", - "purchase_date", - "grouping", - "genre", - "lyrics", - "encoder", - "copyright", - "compilation", - "cover", - "tracknumber", - "discnumber", - "tracktotal", - "disctotal", - "date", -) - - -FLAC_KEY = {v: v.upper() for v in __METADATA_TYPES} -MP4_KEY = dict(zip(__METADATA_TYPES, __MP4_KEYS)) -MP3_KEY = dict(zip(__METADATA_TYPES, __MP3_KEYS)) - -COPYRIGHT = "\u2117" -PHON_COPYRIGHT = "\u00a9" -FLAC_MAX_BLOCKSIZE = 16777215 # 16.7 MB - -# TODO: give these more descriptive names -TRACK_KEYS = ( - "tracknumber", - "artist", - "albumartist", - "composer", - "title", - "albumcomposer", - "explicit", -) -ALBUM_KEYS = ( - "albumartist", - "title", - "year", - "bit_depth", - "sampling_rate", - "container", - "albumcomposer", - "id", -) -# TODO: rename these to DEFAULT_FOLDER_FORMAT etc -FOLDER_FORMAT = ( - "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]" -) -TRACK_FORMAT = "{tracknumber}. {artist} - {title}" - - -TIDAL_MAX_Q = 7 - -TIDAL_Q_MAP = { - "LOW": 0, - "HIGH": 1, - "LOSSLESS": 2, - "HI_RES": 3, -} - -DEEZER_MAX_Q = 6 -DEEZER_FEATURED_KEYS = {"releases", "charts", "selection"} -AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4) -DEEZER_FORMATS = { - "AAC_64", - "MP3_64", - "MP3_128", - "MP3_256", - "MP3_320", - "FLAC", -} -# video only for tidal -MEDIA_TYPES = {"track", "album", "artist", "label", "playlist", "video"} - -# used to homogenize cover size keys -COVER_SIZES = ("thumbnail", "small", "large", "original") - -TIDAL_CLIENT_INFO = { - "id": base64.b64decode("elU0WEhWVmtjMnREUG80dA==").decode("iso-8859-1"), - "secret": base64.b64decode( - "VkpLaERGcUpQcXZzUFZOQlY2dWtYVEptd2x2YnR0UDd3bE1scmM3MnNlND0=" - ).decode("iso-8859-1"), -} - -QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2" - -TIDAL_BASE = "https://api.tidalhifi.com/v1" -TIDAL_AUTH_URL = "https://auth.tidal.com/v1/oauth2" - -DEEZER_BASE = "https://api.deezer.com" -DEEZER_DL = "http://dz.loaderapp.info/deezer" - -SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com" - -MAX_FILES_OPEN = 128 diff --git a/src/converter.py b/streamrip/converter.py similarity index 100% rename from src/converter.py rename to streamrip/converter.py diff --git a/src/core.py b/streamrip/core.py similarity index 100% rename from src/core.py rename to streamrip/core.py diff --git a/src/db.py b/streamrip/db.py similarity index 100% rename from src/db.py rename to streamrip/db.py diff --git a/src/deezer_client.py b/streamrip/deezer_client.py similarity index 100% rename from src/deezer_client.py rename to streamrip/deezer_client.py diff --git a/src/deezloader_client.py b/streamrip/deezloader_client.py similarity index 100% rename from src/deezloader_client.py rename to streamrip/deezloader_client.py diff --git a/src/downloadable.py b/streamrip/downloadable.py similarity index 100% rename from src/downloadable.py rename to streamrip/downloadable.py diff --git a/streamrip/downloadtools.py b/streamrip/downloadtools.py deleted file mode 100644 index 1473579..0000000 --- a/streamrip/downloadtools.py +++ /dev/null @@ -1,225 +0,0 @@ -import asyncio -import functools -import hashlib -import logging -import os -import re -from tempfile import gettempdir -from typing import Callable, Dict, Iterable, List, Optional - -import aiofiles -import aiohttp -from Cryptodome.Cipher import Blowfish - -from .exceptions import NonStreamable -from .utils import gen_threadsafe_session - -logger = logging.getLogger("streamrip") - - -class DownloadStream: - """An iterator over chunks of a stream. - - Usage: - - >>> stream = DownloadStream('https://google.com', None) - >>> with open('google.html', 'wb') as file: - >>> for chunk in stream: - >>> file.write(chunk) - - """ - - is_encrypted = re.compile("/m(?:obile|edia)/") - - def __init__( - self, - url: str, - source: str = None, - params: dict = None, - headers: dict = None, - item_id: str = None, - ): - """Create an iterable DownloadStream of a URL. - - :param url: The url to download - :type url: str - :param source: Only applicable for Deezer - :type source: str - :param params: Parameters to pass in the request - :type params: dict - :param headers: Headers to pass in the request - :type headers: dict - :param item_id: (Only for Deezer) the ID of the track - :type item_id: str - """ - self.source = source - self.session = gen_threadsafe_session(headers=headers) - - self.id = item_id - if isinstance(self.id, int): - self.id = str(self.id) - - if params is None: - params = {} - - self.request = self.session.get( - url, allow_redirects=True, stream=True, params=params - ) - self.file_size = int(self.request.headers.get("Content-Length", 0)) - - if self.file_size < 20000 and not self.url.endswith(".jpg"): - import json - - try: - info = self.request.json() - try: - # Usually happens with deezloader downloads - raise NonStreamable(f"{info['error']} - {info['message']}") - except KeyError: - raise NonStreamable(info) - - except json.JSONDecodeError: - raise NonStreamable("File not found.") - - def __iter__(self) -> Iterable: - """Iterate through chunks of the stream. - - :rtype: Iterable - """ - if self.source == "deezer" and self.is_encrypted.search(self.url) is not None: - assert isinstance(self.id, str), self.id - - blowfish_key = self._generate_blowfish_key(self.id) - # decryptor = self._create_deezer_decryptor(blowfish_key) - CHUNK_SIZE = 2048 * 3 - return ( - # (decryptor.decrypt(chunk[:2048]) + chunk[2048:]) - (self._decrypt_chunk(blowfish_key, chunk[:2048]) + chunk[2048:]) - if len(chunk) >= 2048 - else chunk - for chunk in self.request.iter_content(CHUNK_SIZE) - ) - - return self.request.iter_content(chunk_size=1024) - - @property - def url(self): - """Return the requested url.""" - return self.request.url - - def __len__(self) -> int: - """Return the value of the "Content-Length" header. - - :rtype: int - """ - return self.file_size - - def _create_deezer_decryptor(self, key) -> Blowfish: - return Blowfish.new(key, Blowfish.MODE_CBC, b"\x00\x01\x02\x03\x04\x05\x06\x07") - - @staticmethod - def _generate_blowfish_key(track_id: str): - """Generate the blowfish key for Deezer downloads. - - :param track_id: - :type track_id: str - """ - SECRET = "g4el58wc0zvf9na1" - md5_hash = hashlib.md5(track_id.encode()).hexdigest() - # good luck :) - return "".join( - chr(functools.reduce(lambda x, y: x ^ y, map(ord, t))) - for t in zip(md5_hash[:16], md5_hash[16:], SECRET) - ).encode() - - @staticmethod - def _decrypt_chunk(key, data): - """Decrypt a chunk of a Deezer stream. - - :param key: - :param data: - """ - return Blowfish.new( - key, - Blowfish.MODE_CBC, - b"\x00\x01\x02\x03\x04\x05\x06\x07", - ).decrypt(data) - - -class DownloadPool: - """Asynchronously download a set of urls.""" - - def __init__( - self, - urls: Iterable, - tempdir: str = None, - chunk_callback: Optional[Callable] = None, - ): - self.finished: bool = False - # Enumerate urls to know the order - self.urls = dict(enumerate(urls)) - self._downloaded_urls: List[str] = [] - # {url: path} - self._paths: Dict[str, str] = {} - self.task: Optional[asyncio.Task] = None - - if tempdir is None: - tempdir = gettempdir() - self.tempdir = tempdir - - async def getfn(self, url): - path = os.path.join(self.tempdir, f"__streamrip_partial_{abs(hash(url))}") - self._paths[url] = path - return path - - async def _download_urls(self): - async with aiohttp.ClientSession() as session: - tasks = [ - asyncio.ensure_future(self._download_url(session, url)) - for url in self.urls.values() - ] - await asyncio.gather(*tasks) - - async def _download_url(self, session, url): - filename = await self.getfn(url) - logger.debug("Downloading %s", url) - async with session.get(url) as response, aiofiles.open(filename, "wb") as f: - # without aiofiles 3.6632679780000004s - # with aiofiles 2.504482839s - await f.write(await response.content.read()) - - if self.callback: - self.callback() - - logger.debug("Finished %s", url) - - def download(self, callback=None): - self.callback = callback - asyncio.run(self._download_urls()) - - @property - def files(self): - if len(self._paths) != len(self.urls): - # Not all of them have downloaded - raise Exception("Must run DownloadPool.download() before accessing files") - - return [ - os.path.join(self.tempdir, self._paths[self.urls[i]]) - for i in range(len(self.urls)) - ] - - def __len__(self): - return len(self.urls) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - logger.debug("Removing tempfiles %s", self._paths) - for file in self._paths.values(): - try: - os.remove(file) - except FileNotFoundError: - pass - - return False diff --git a/src/exceptions.py b/streamrip/exceptions.py similarity index 100% rename from src/exceptions.py rename to streamrip/exceptions.py diff --git a/streamrip/filepath_utils.py b/streamrip/filepath_utils.py new file mode 100644 index 0000000..38cca8c --- /dev/null +++ b/streamrip/filepath_utils.py @@ -0,0 +1,35 @@ +from string import Formatter, printable + +from pathvalidate import sanitize_filename + + +def clean_filename(fn: str, restrict=False) -> str: + path = str(sanitize_filename(fn)) + if restrict: + allowed_chars = set(printable) + path = "".join(c for c in path if c in allowed_chars) + + return path + + +def clean_format(formatter: str, format_info: dict, restrict: bool = False) -> str: + """Format track or folder names sanitizing every formatter key. + + :param formatter: + :type formatter: str + :param kwargs: + """ + fmt_keys = filter(None, (i[1] for i in Formatter().parse(formatter))) + + clean_dict = {} + for key in fmt_keys: + if isinstance(format_info.get(key), (str, float)): + clean_dict[key] = clean_filename(str(format_info[key]), restrict=restrict) + elif key == "explicit": + clean_dict[key] = " (Explicit) " if format_info.get(key, False) else "" + elif isinstance(format_info.get(key), int): # track/discnumber + clean_dict[key] = f"{format_info[key]:02}" + else: + clean_dict[key] = "Unknown" + + return formatter.format(**clean_dict) diff --git a/streamrip/media.py b/streamrip/media.py index dbdad0b..bf52fc7 100644 --- a/streamrip/media.py +++ b/streamrip/media.py @@ -1,2377 +1,32 @@ -"""Bases that handle parsing and downloading media. """ +from abc import ABC, abstractmethod -import abc -import concurrent.futures -import hashlib -import logging -import os -import re -import shutil -import subprocess -from tempfile import gettempdir -from typing import ( - Any, - Dict, - Generator, - Iterable, - List, - Optional, - Sequence, - Tuple, - Union, -) -from click import echo, secho, style -from mutagen.flac import FLAC, Picture -from mutagen.id3 import APIC, ID3, ID3NoHeaderError -from mutagen.mp4 import MP4, MP4Cover -from pathvalidate import sanitize_filepath +class Media(ABC): + async def rip(self): + await self.preprocess() + await self.download() + await self.postprocess() -from . import converter -from .clients import Client, DeezloaderClient -from .constants import ALBUM_KEYS, FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT -from .downloadtools import DownloadPool, DownloadStream -from .exceptions import ( - InvalidQuality, - InvalidSourceError, - ItemExists, - NonStreamable, - PartialFailure, - TooLargeCoverArt, -) -from .metadata import TrackMetadata -from .utils import ( - clean_filename, - clean_format, - concat_audio_files, - decrypt_mqa_file, - downsize_image, - ext, - get_container, - get_cover_urls, - get_stats_from_quality, - get_tqdm_bar, - safe_get, - tidal_cover_url, - tqdm_stream, -) + @abstractmethod + async def preprocess(self): + """Create directories, download cover art, etc.""" + raise NotImplemented -logger = logging.getLogger("streamrip") + @abstractmethod + async def download(self): + """Download and tag the actual audio files in the correct directories.""" + raise NotImplemented -TYPE_REGEXES = { - "remaster": re.compile(r"(?i)(re)?master(ed)?"), - "extra": re.compile(r"(?i)(anniversary|deluxe|live|collector|demo|expanded)"), -} + @abstractmethod + async def postprocess(self): + """Update database, run conversion, delete garbage files etc.""" + raise NotImplemented -class Media(abc.ABC): - """An interface for a downloadable item.""" +class Pending(ABC): + """A request to download a `Media` whose metadata has not been fetched.""" - @abc.abstractmethod - def download(self, **kwargs): - """Download the item. - - :param kwargs: - """ - pass - - @abc.abstractmethod - def load_meta(self, **kwargs): - """Load all of the metadata for an item. - - :param kwargs: - """ - pass - - @abc.abstractmethod - def tag(self, **kwargs): - """Tag this item with metadata, if applicable. - - :param kwargs: - """ - pass - - @abc.abstractmethod - def convert(self, **kwargs): - """Convert this item between file formats. - - :param kwargs: - """ - pass - - @abc.abstractmethod - def __repr__(self): - """Return a string representation of the item.""" - pass - - @abc.abstractmethod - def __str__(self): - """Get a readable representation of the item.""" - pass - - @property - @abc.abstractmethod - def type(self): - """Return the type of the item.""" - pass - - @property - @abc.abstractmethod - def downloaded_ids(self): - """If the item is a collection, this is a set of downloaded IDs.""" - pass - - @downloaded_ids.setter - def downloaded_ids(self, other): - pass - - @property - @abc.abstractmethod - def id(self): - pass - - @id.setter - def id(self, other): - pass - - -class Track(Media): - """Represents a downloadable track. - - Loading metadata as a single track: - >>> t = Track(client, id='20252078') - >>> t.load_meta() # load metadata from api - - Loading metadata as part of an Album: - >>> t = Track.from_album_meta(api_track_dict, client) - - where `api_track_dict` is a track entry in an album tracklist. - - Downloading and tagging: - >>> t.download() - >>> t.tag() - """ - - id = None - downloaded_ids: set = set() - downloaded: bool = False - tagged: bool = False - converted: bool = False - - quality: int - folder: str - meta: TrackMetadata - - final_path: str - container: str - - def __init__(self, client: Client, **kwargs): - """Create a track object. - - The only required parameter is client, but passing at an id is - highly recommended. Every value in kwargs will be set as an attribute - of the object. (TODO: make this safer) - - :param track_id: track id returned by Qobuz API - :type track_id: Optional[Union[str, int]] - :param client: qopy client - :type client: Client - :param meta: TrackMetadata object - :type meta: Optional[TrackMetadata] - :param kwargs: id, filepath_format, meta, quality, folder - """ - logger.debug(kwargs) - - self.client = client - self.__dict__.update(kwargs) - - self.part_of_tracklist = kwargs.get("part_of_tracklist", False) - - if isinstance(kwargs.get("meta"), TrackMetadata): - self.meta = kwargs["meta"] - - if (u := kwargs.get("cover_url")) is not None: - logger.debug("Cover url: %s", u) - self.cover_url = u - - def load_meta(self, **kwargs): - """Send a request to the client to get metadata for this Track. - - Usually only called for single tracks and last.fm playlists. - """ - assert self.id is not None, "id must be set before loading metadata" - - source = self.client.source - - self.resp = self.client.get(self.id, media_type="track") - self.meta = TrackMetadata( - track=self.resp, source=source - ) # meta dict -> TrackMetadata object - - # Because the cover urls are not parsed when only the track metadata - # is loaded, we need to do this ourselves - - # Default to large if chosen size is not available - self.cover_url = self.meta.cover_urls.get( - kwargs.get("embed_cover_size", "large"), - self.meta.cover_urls.get("large"), - ) - - def _prepare_download(self, **kwargs): - """Do preprocessing before downloading items. - - It creates the directories, downloads cover art, and (optionally) - downloads booklets. - - :param kwargs: - """ - # args override attributes - self.quality = min( - kwargs["quality"], self.client.max_quality, self.meta.quality - ) - - self.folder = kwargs["parent_folder"] or self.folder - - if not self.part_of_tracklist and kwargs["add_singles_to_folder"]: - self.folder = os.path.join( - self.folder, - clean_format( - kwargs.get("folder_format", FOLDER_FORMAT), - self.meta.get_album_formatter(self.quality), - restrict=kwargs.get("restrict_filenames", False), - ), - ) - - self.file_format = kwargs.get("track_format", TRACK_FORMAT) - - self.folder = sanitize_filepath(self.folder, platform="auto") - self.format_final_path( - restrict=kwargs.get("restrict_filenames", False) - ) # raises: ItemExists - - os.makedirs(self.folder, exist_ok=True) - - if hasattr(self, "cover_url"): - try: - self.download_cover( - width=kwargs.get("max_artwork_width", 999999), - height=kwargs.get("max_artwork_height", 999999), - ) # only downloads for playlists and singles - except ItemExists as e: - logger.debug(e) - - self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp") - - def download( # noqa - self, - quality: int = 3, - parent_folder: str = "StreamripDownloads", - progress_bar: bool = True, - **kwargs, - ): - """Download the track. - - :param quality: (0, 1, 2, 3, 4) - :type quality: int - :param folder: folder to download the files to - :type folder: Optional[Union[str, os.PathLike]] - :param progress_bar: turn on/off progress bar - :type progress_bar: bool - """ - if not self.part_of_tracklist and not self.client.source == "soundcloud": - secho(f"Downloading {self!s}\n", bold=True) - - self._prepare_download( - quality=quality, - parent_folder=parent_folder, - progress_bar=progress_bar, - **kwargs, - ) - - if self.client.source == "soundcloud": - # soundcloud client needs whole dict to get file url - url_id = self.resp - else: - url_id = self.id - - try: - dl_info = self.client.get_file_url(url_id, self.quality) - except Exception as e: - logger.debug(repr(e)) - raise NonStreamable(e) - - if self.client.source == "qobuz": - if not self.__validate_qobuz_dl_info(dl_info): - raise NonStreamable("Track is not available for download") - - self.sampling_rate = dl_info.get("sampling_rate") - self.bit_depth = dl_info.get("bit_depth") - - # --------- Download Track ---------- - if self.client.source in ("qobuz", "tidal"): - logger.debug("Downloadable URL found: %s", dl_info.get("url")) - try: - download_url = dl_info["url"] - except KeyError as e: - if restrictions := dl_info["restrictions"]: - # Turn CamelCase code into a readable sentence - words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"]) - raise NonStreamable( - words[0] + " " + " ".join(map(str.lower, words[1:])) + "." - ) - - secho(f"Panic: {e} dl_info = {dl_info}", fg="red") - raise NonStreamable - - _quick_download(download_url, self.path, desc=self._progress_desc) - - elif isinstance(self.client, DeezloaderClient): - _quick_download(dl_info["url"], self.path, desc=self._progress_desc) - - elif self.client.source == "deezer": - # We can only find out if the requested quality is available - # after the streaming request is sent for deezer - - try: - stream = DownloadStream( - dl_info["url"], source="deezer", item_id=self.id - ) - except NonStreamable: - self.id = dl_info["fallback_id"] - dl_info = self.client.get_file_url(self.id, self.quality) - assert isinstance(dl_info, dict) - stream = DownloadStream( - dl_info["url"], source="deezer", item_id=self.id - ) - - stream_size = len(stream) - stream_quality = dl_info["size_to_quality"][stream_size] - if self.quality != stream_quality: - # The chosen quality is not available - self.quality = stream_quality - self.format_final_path( - restrict=kwargs.get("restrict_filenames", False) - ) # If the extension is different - - with open(self.path, "wb") as file: - for chunk in tqdm_stream(stream, desc=self._progress_desc): - file.write(chunk) - - elif self.client.source == "soundcloud": - self._soundcloud_download(dl_info) - - else: - raise InvalidSourceError(self.client.source) - - if ( - self.client.source == "tidal" - and isinstance(dl_info, dict) - and dl_info.get("enc_key", False) - ): - out_path = f"{self.path}_dec" - logger.debug("Decrypting MQA file") - decrypt_mqa_file(self.path, out_path, dl_info["enc_key"]) - self.path = out_path - - if not kwargs.get("stay_temp", False): - self.move(self.final_path) - - logger.debug("Downloaded: %s -> %s", self.path, self.final_path) - - self.downloaded = True - - if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"): - os.remove(self.cover_path) - - def __validate_qobuz_dl_info(self, info: dict) -> bool: - """Check if the download info dict returned by Qobuz is downloadable. - - :param info: - :type info: dict - :rtype: bool - """ - return all( - ( - info.get("sampling_rate"), - info.get("bit_depth"), - not info.get("sample"), - ) - ) - - def move(self, path: str): - """Move the Track and set self.path to the new path. - - :param path: - :type path: str - """ - os.makedirs(os.path.dirname(path), exist_ok=True) - - try: - shutil.move(self.path, path) - except FileNotFoundError as e: - # This sometimes happens when using Music.app's Auto folder - logger.debug("%s during shutil.move", e) - os.makedirs(os.path.dirname(path), exist_ok=True) - shutil.move(self.path, path) - - self.path = path - - def _soundcloud_download(self, dl_info: dict): - """Download a soundcloud track. - - This requires a seperate function because there are three methods that - can be used to download a track: - * original file downloads - * direct mp3 downloads - * hls stream ripping - All three of these need special processing. - - :param dl_info: - :type dl_info: dict - :rtype: str - """ - # logger.debug("dl_info: %s", dl_info) - if dl_info["type"] == "mp3": - import m3u8 - import requests - - parsed_m3u = m3u8.loads( - requests.get(dl_info["url"]).content.decode("utf-8") - ) - self.path += ".mp3" - - with DownloadPool(segment.uri for segment in parsed_m3u.segments) as pool: - - bar = get_tqdm_bar(len(pool), desc=self._progress_desc, unit="Chunk") - - def update_tqdm_bar(): - bar.update(1) - - pool.download(callback=update_tqdm_bar) - - concat_audio_files(pool.files, self.path, "mp3") - - elif dl_info["type"] == "original": - _quick_download(dl_info["url"], self.path, desc=self._progress_desc) - - # if a wav is returned, convert to flac - engine = converter.FLAC(self.path) - self.path = f"{self.path}.flac" - engine.convert(custom_fn=self.path) - - self.final_path = self.final_path.replace(".mp3", ".flac") - self.quality = 2 - - @property - def type(self) -> str: - """Return "track". - - :rtype: str - """ - return "track" - - @property - def _progress_desc(self) -> str: - """Get the description that is used on the progress bar. - - :rtype: str - """ - return style(f"Track {self.meta.tracknumber:02}", fg="blue") - - def download_cover(self, width=999999, height=999999): - """Download the cover art, if cover_url is given.""" - self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg") - logger.debug("Downloading cover from %s", self.cover_url) - - if not os.path.exists(self.cover_path): - _cover_download(self.cover_url, self.cover_path) - downsize_image(self.cover_path, width, height) - else: - logger.debug("Cover already exists, skipping download") - raise ItemExists(self.cover_path) - - def format_final_path(self, restrict: bool = False) -> str: - """Return the final filepath of the downloaded file. - - This uses the `get_formatter` method of TrackMetadata, which returns - a dict with the keys allowed in formatter strings, and their values in - the TrackMetadata object. - """ - formatter = self.meta.get_formatter(max_quality=self.quality) - logger.debug("Track meta formatter %s", formatter) - filename = clean_format(self.file_format, formatter, restrict=restrict) - self.final_path = os.path.join(self.folder, filename)[:250].strip() + ext( - self.quality, self.client.source - ) - - logger.debug("Formatted path: %s", self.final_path) - - if os.path.isfile(self.final_path): # track already exists - self.downloaded = True - self.tagged = True - self.path = self.final_path - raise ItemExists(self.final_path) - - return self.final_path - - @classmethod - def from_album_meta(cls, album: TrackMetadata, track: dict, client: Client): - """Return a new Track object initialized with info. - - :param album: album metadata returned by API - :param pos: index of the track - :param client: qopy client object - :type client: Client - :raises: IndexError - """ - meta = TrackMetadata(album=album, track=track, source=client.source) - return cls(client=client, meta=meta, id=track["id"], part_of_tracklist=True) - - @classmethod - def from_api(cls, item: dict, client: Client): - """Return a new Track initialized from search result. - - :param item: - :type item: dict - :param client: - :type client: Client - """ - meta = TrackMetadata(track=item, source=client.source) - cover_url: Optional[str] - try: - if client.source == "qobuz": - cover_url = item["album"]["image"]["large"] - elif client.source == "tidal": - cover_url = tidal_cover_url(item["album"]["cover"], 640) - elif client.source == "deezer": - cover_url = item["album"]["cover_big"] - elif client.source == "soundcloud": - if (small_url := item["artwork_url"]) is not None: - cover_url = small_url.replace("large", "t500x500") - else: - raise KeyError - else: - raise InvalidSourceError(client.source) - - except KeyError: - logger.debug("No cover found") - cover_url = None - - return cls( - client=client, - meta=meta, - id=item["id"], - cover_url=cover_url, - ) - - def tag( # noqa - self, - album_meta: dict = None, - cover: Union[Picture, APIC, MP4Cover] = None, - embed_cover: bool = True, - exclude_tags: Optional[Sequence] = None, - **kwargs, - ): - """Tag the track using the stored metadata. - - The info stored in the TrackMetadata object (self.meta) can be updated - with album metadata if necessary. The cover must be a mutagen cover-type - object that already has the bytes loaded. - - :param album_meta: album metadata to update Track with - :type album_meta: dict - :param cover: initialized mutagen cover object - :type cover: Union[Picture, APIC] - :param embed_cover: Embed cover art into file - :type embed_cover: bool - """ - assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata" - if not self.downloaded: - logger.info( - "Track %s not tagged because it was not downloaded", - self["title"], - ) - return - - if self.tagged: - logger.info( - "Track %s not tagged because it is already tagged", - self["title"], - ) - return - - if album_meta is not None: - self.meta.add_album_meta(album_meta) # extend meta with album info - - # TODO: make this cleaner - if self.converted: - if self.container == "FLAC": - audio = FLAC(self.path) - elif self.container in ("AAC", "ALAC", "MP4"): - audio = MP4(self.path) - elif self.container == "MP3": - audio = ID3() - try: - audio = ID3(self.path) - except ID3NoHeaderError: - audio = ID3() - else: - if self.quality in (2, 3, 4): - self.container = "FLAC" - logger.debug("Tagging file with %s container", self.container) - audio = FLAC(self.path) - elif self.quality <= 1: - if self.client.source == "tidal": - self.container = "AAC" - audio = MP4(self.path) - else: - self.container = "MP3" - try: - audio = ID3(self.path) - except ID3NoHeaderError: - audio = ID3() - - logger.debug("Tagging file with %s container", self.container) - else: - raise InvalidQuality(f'Invalid quality: "{self.quality}"') - - # automatically generate key, value pairs based on container - tags = self.meta.tags( - self.container, - set(exclude_tags) if exclude_tags is not None else None, - ) - for k, v in tags: - logger.debug("Setting %s tag to %s", k, v) - audio[k] = v - - if embed_cover and cover is None: - cover = ( - Tracklist.get_cover_obj( - self.cover_path, self.container, self.client.source - ) - if hasattr(self, "cover_path") - else None - ) - - if isinstance(audio, FLAC): - if embed_cover and cover: - audio.add_picture(cover) - audio.save() - elif isinstance(audio, ID3): - if embed_cover and cover: - audio.add(cover) - audio.save(self.path, "v2_version=3") - elif isinstance(audio, MP4): - if cover: - audio["covr"] = [cover] - audio.save() - else: - raise ValueError(f"Unknown container type: {audio}") - - self.tagged = True - - def convert(self, codec: str = "ALAC", **kwargs): - """Convert the track to another codec. - - Valid values for codec: - * FLAC - * ALAC - * MP3 - * OPUS - * OGG - * VORBIS - * AAC - * M4A - - :param codec: the codec to convert the track to - :type codec: str - :param kwargs: - """ - if not self.downloaded: - logger.debug("Track not downloaded, skipping conversion") - secho("Track not downloaded, skipping conversion", fg="magenta") - return - - CONV_CLASS = { - "FLAC": converter.FLAC, - "ALAC": converter.ALAC, - "MP3": converter.LAME, - "OPUS": converter.OPUS, - "OGG": converter.Vorbis, - "VORBIS": converter.Vorbis, - "AAC": converter.AAC, - "M4A": converter.AAC, - } - - try: - self.container = codec.upper() - except AttributeError: - secho("Error: No audio codec chosen to convert to.", fg="red") - exit() - - if not hasattr(self, "final_path"): - self.format_final_path(kwargs.get("restrict_filenames", False)) - - if not os.path.isfile(self.path): - logger.info("File %s does not exist. Skipping conversion.", self.path) - secho(f"{self!s} does not exist. Skipping conversion.", fg="red") - return - - assert ( - self.container in CONV_CLASS - ), f"Invalid codec {codec}. Must be in {CONV_CLASS.keys()}" - - engine = CONV_CLASS[self.container]( - filename=self.path, - sampling_rate=kwargs.get("sampling_rate"), - remove_source=kwargs.get("remove_source", True), - ) - engine.convert() - self.path = engine.final_fn - self.final_path = self.final_path.replace( - ext(self.quality, self.client.source), f".{engine.container}" - ) - - if not kwargs.get("stay_temp", False): - self.move(self.final_path) - - self.converted = True - - @property - def title(self) -> str: - """Get the title of the track. - - :rtype: str - """ - try: - _title = self.meta.title - except AttributeError: - raise Exception("Track must be loaded before accessing title") - - if self.meta.explicit: - _title = f"{_title} (Explicit)" - return _title - - def get(self, *keys, default=None) -> Any: - """Safe get method that allows for layered access. - - :param keys: - :param default: - """ - return safe_get(self.meta, *keys, default=default) # type: ignore - - def set(self, key, val): - """Set attribute `key` to `val`. - - Equivalent to __setitem__. Implemented only for consistency. - - :param key: - :param val: - """ - self.__setitem__(key, val) - - def __getitem__(self, key: str) -> Any: - """Dict-like interface for Track metadata. - - :param key: - """ - return getattr(self.meta, key) - - def __setitem__(self, key: str, val: Any): - """Dict-like interface for Track metadata. - - :param key: - :param val: - """ - setattr(self.meta, key, val) - - def __repr__(self) -> str: - """Return a string representation of the track. - - :rtype: str - """ - return f"" - - def __str__(self) -> str: - """Return a readable string representation of this track. - - :rtype: str - """ - return f"{self['artist']} - {self['title']}" - - def __bool__(self): - """Return True.""" - return True - - -class Video(Media): - """Only for Tidal.""" - - id = None - downloaded_ids: set = set() - - def __init__(self, client: Client, id: str, **kwargs): - """Initialize a Video object. - - :param client: - :type client: Client - :param id: The TIDAL Video ID - :type id: str - :param kwargs: title, explicit, and tracknumber - """ - self.id = id - self.client = client - - def load_meta(self, **kwargs): - """Given an id at contruction, get the metadata of the video.""" - resp = self.client.get(self.id, "video") - self.title = resp["title"] - self.explicit = resp["explicit"] - self.tracknumber = resp["trackNumber"] - - def download(self, **kwargs): - """Download the Video. - - :param kwargs: - """ - - if not kwargs.get("download_videos", True): - return - - import m3u8 - import requests - - # secho( - # f"Downloading {self.title} (Video). This may take a while.", - # fg="blue", - # ) - - self.parent_folder = kwargs.get("parent_folder", "StreamripDownloads") - url = self.client.get_file_url(self.id, video=True) - - parsed_m3u = m3u8.loads(requests.get(url).text) - # Asynchronously download the streams - - with DownloadPool(segment.uri for segment in parsed_m3u.segments) as pool: - bar = get_tqdm_bar(len(pool), desc=self._progress_desc, unit="Chunk") - - def update_tqdm_bar(): - bar.update(1) - - pool.download(callback=update_tqdm_bar) - - # Put the filenames in a tempfile that ffmpeg - # can read from - file_list_path = os.path.join(gettempdir(), "__streamrip_video_files") - with open(file_list_path, "w") as file_list: - text = "\n".join(f"file '{path}'" for path in pool.files) - file_list.write(text) - - # Use ffmpeg to concat the files - subprocess.call( - ( - "ffmpeg", - "-f", - "concat", - "-safe", - "0", - "-i", - file_list_path, - "-c", - "copy", - "-loglevel", - "panic", - self.path, - ) - ) - - os.remove(file_list_path) - - def tag(self, *args, **kwargs): - """Return False. - - This is a dummy method. - - :param args: - :param kwargs: - """ - return False - - @classmethod - def from_album_meta(cls, track: dict, client: Client): - """Return a new Video object given an album API response. - - :param track: track dict from album - :type track: dict - :param client: - :type client: Client - """ - return cls( - client, - id=track["id"], - title=track["title"], - explicit=track["explicit"], - tracknumber=track["trackNumber"], - ) - - def convert(self, *args, **kwargs): - """Return None. - - Dummy method. - - :param args: - :param kwargs: - """ - pass - - @property - def _progress_desc(self) -> str: - return style(f"Video {self.tracknumber:02}", fg="blue") - - @property - def path(self) -> str: - """Get path to download the mp4 file. - - :rtype: str - """ - os.makedirs(self.parent_folder, exist_ok=True) - fname = self.title - if self.explicit: - fname = f"{fname} (Explicit)" - if self.tracknumber is not None: - fname = f"{self.tracknumber:02}. {fname}" - - return os.path.join(self.parent_folder, f"{fname}.mp4") - - @property - def type(self) -> str: - """Return "video". - - :rtype: str - """ - return "video" - - def __str__(self) -> str: - """Return the title. - - :rtype: str - """ - return self.title - - def __repr__(self) -> str: - """Return a string representation of self. - - :rtype: str - """ - return f"