From 3640e4e70a51e74869ee8c762d15b3f7c94ff9c2 Mon Sep 17 00:00:00 2001 From: Nathan Thomas Date: Fri, 17 Nov 2023 20:40:46 -0800 Subject: [PATCH] Update --- streamrip/album.py | 15 +- streamrip/artwork.py | 70 ++-- streamrip/cli2.py | 17 +- streamrip/client.py | 2 +- streamrip/downloadable.py | 18 +- streamrip/main.py | 11 +- streamrip/metadata.py | 420 ------------------------ streamrip/metadata/__init__.py | 14 + streamrip/metadata/album_metadata.py | 239 ++++++++++++++ streamrip/metadata/covers.py | 116 +++++++ streamrip/metadata/playlist_metadata.py | 85 +++++ streamrip/metadata/track_metadata.py | 141 ++++++++ streamrip/metadata/util.py | 51 +++ streamrip/playlist.py | 75 ++++- streamrip/prompter.py | 26 +- streamrip/qobuz_client.py | 1 + streamrip/soundcloud_client.py | 200 +++++++++-- streamrip/tagger.py | 14 +- streamrip/track.py | 14 +- streamrip/universal_url.py | 42 ++- 20 files changed, 1051 insertions(+), 520 deletions(-) delete mode 100644 streamrip/metadata.py create mode 100644 streamrip/metadata/__init__.py create mode 100644 streamrip/metadata/album_metadata.py create mode 100644 streamrip/metadata/covers.py create mode 100644 streamrip/metadata/playlist_metadata.py create mode 100644 streamrip/metadata/track_metadata.py create mode 100644 streamrip/metadata/util.py diff --git a/streamrip/album.py b/streamrip/album.py index 120f509..9f95dd8 100644 --- a/streamrip/album.py +++ b/streamrip/album.py @@ -8,8 +8,9 @@ from .client import Client from .config import Config from .console import console from .media import Media, Pending -from .metadata import AlbumMetadata, get_album_track_ids -from .track import PendingTrack, Track +from .metadata import AlbumMetadata +from .metadata.util import get_album_track_ids +from .track import PendingTrack logger = logging.getLogger("streamrip") @@ -25,7 +26,7 @@ class Album(Media): async def preprocess(self): if self.config.session.cli.text_output: console.print( - f"[cyan]Downloading {self.meta.album} by {self.meta.albumartist}" + f"Downloading [cyan]{self.meta.album}[/cyan] by [cyan]{self.meta.albumartist}[/cyan]" ) async def download(self): @@ -53,11 +54,15 @@ class PendingAlbum(Pending): album_folder = self._album_folder(folder, meta) os.makedirs(album_folder, exist_ok=True) embed_cover, _ = await download_artwork( - self.client.session, album_folder, meta.covers, self.config.session.artwork + self.client.session, + album_folder, + meta.covers, + self.config.session.artwork, + for_playlist=False, ) pending_tracks = [ PendingTrack( - id=id, + id, album=meta, client=self.client, config=self.config, diff --git a/streamrip/artwork.py b/streamrip/artwork.py index 94f6717..82e264d 100644 --- a/streamrip/artwork.py +++ b/streamrip/artwork.py @@ -1,5 +1,7 @@ import asyncio +import logging import os +import shutil import aiohttp from PIL import Image @@ -8,52 +10,82 @@ from .config import ArtworkConfig from .downloadable import BasicDownloadable from .metadata import Covers +_artwork_tempdirs: set[str] = set() + +logger = logging.getLogger("streamrip") + + +def remove_artwork_tempdirs(): + logger.debug("Removing dirs %s", _artwork_tempdirs) + for path in _artwork_tempdirs: + shutil.rmtree(path) + async def download_artwork( - session: aiohttp.ClientSession, folder: str, covers: Covers, config: ArtworkConfig + session: aiohttp.ClientSession, + folder: str, + covers: Covers, + config: ArtworkConfig, + for_playlist: bool, ) -> tuple[str | None, str | None]: - """Download artwork, which may include a seperate file to keep. - Also updates the passed Covers object with downloaded filepaths. + """Download artwork and update passed Covers object with filepaths. - Because it is a single, we will assume that none of the covers have already been - downloaded, so existing paths in `covers` will be discarded and overwritten. + If paths for the selected sizes already exist in `covers`, nothing will + be downloaded. + + If `for_playlist` is set, it will not download hires cover art regardless + of the config setting. + + Embedded artworks are put in a temporary directory under `folder` called + "__embed" that can be deleted once a playlist or album is done downloading. + + Hi-res (saved) artworks are kept in `folder` as "cover.jpg". Args: - covers (Covers): The set of available covers. - + session (aiohttp.ClientSession): + folder (str): + covers (Covers): + config (ArtworkConfig): + for_playlist (bool): Set to disable saved hires covers. Returns: - (path to embed cover, path to hires cover) - The path of the cover to embed, or None if there either is no artwork available or - if artwork embedding is turned off. + (path to embedded artwork, path to hires artwork) """ - if (not config.save_artwork and not config.embed) or covers.empty(): + save_artwork, embed = config.save_artwork, config.embed + if for_playlist: + save_artwork = False + + if not (save_artwork or embed) or covers.empty(): # No need to download anything return None, None downloadables = [] - saved_cover_path = None - if config.save_artwork: - _, l_url, _ = covers.largest() - assert l_url is not None # won't be true unless covers is empty + _, l_url, saved_cover_path = covers.largest() + if saved_cover_path is None and save_artwork: saved_cover_path = os.path.join(folder, "cover.jpg") + assert l_url is not None downloadables.append( BasicDownloadable(session, l_url, "jpg").download( saved_cover_path, lambda _: None ) ) - embed_cover_path = None - if config.embed: - _, embed_url, _ = covers.get_size(config.embed_size) + _, embed_url, embed_cover_path = covers.get_size(config.embed_size) + if embed_cover_path is None and config.embed: assert embed_url is not None - embed_cover_path = os.path.join(folder, "embed_cover.jpg") + embed_dir = os.path.join(folder, "__artwork") + os.makedirs(embed_dir, exist_ok=True) + _artwork_tempdirs.add(embed_dir) + embed_cover_path = os.path.join(embed_dir, f"cover{hash(embed_url)}.jpg") downloadables.append( BasicDownloadable(session, embed_url, "jpg").download( embed_cover_path, lambda _: None ) ) + if len(downloadables) == 0: + return embed_cover_path, saved_cover_path + await asyncio.gather(*downloadables) # Update `covers` to reflect the current download state diff --git a/streamrip/cli2.py b/streamrip/cli2.py index 16c66d3..2dfe05b 100644 --- a/streamrip/cli2.py +++ b/streamrip/cli2.py @@ -7,8 +7,9 @@ from functools import wraps import click from click import secho -from click_help_colors import HelpColorsGroup +from click_help_colors import HelpColorsGroup # type: ignore from rich.logging import RichHandler +from rich.prompt import Confirm from rich.traceback import install from .config import Config, set_user_defaults @@ -118,7 +119,8 @@ async def file(ctx, path): with Config(config_path) as cfg: main = Main(cfg) with open(path) as f: - await asyncio.gather(*[main.add(url) for url in f]) + for url in f: + await main.add(url) await main.resolve() await main.rip() @@ -152,13 +154,10 @@ def config_reset(ctx, yes): """Reset the config file.""" config_path = ctx.obj["config_path"] if not yes: - echo_w( - f"Are you sure you want to reset the config file at {config_path}? [y/n] ", - nl=False, - ) - result = input() - if result.strip() != "y": - echo_i("Reset aborted.") + if not Confirm.ask( + f"Are you sure you want to reset the config file at {config_path}?" + ): + console.print("[green]Reset aborted") return shutil.copy(BLANK_CONFIG_PATH, config_path) diff --git a/streamrip/client.py b/streamrip/client.py index c03024a..3ab7213 100644 --- a/streamrip/client.py +++ b/streamrip/client.py @@ -36,7 +36,7 @@ class Client(ABC): raise NotImplementedError @abstractmethod - async def get_downloadable(self, item: dict, quality: int) -> Downloadable: + async def get_downloadable(self, item: str, quality: int) -> Downloadable: raise NotImplementedError @staticmethod diff --git a/streamrip/downloadable.py b/streamrip/downloadable.py index bea6497..1bf8ab3 100644 --- a/streamrip/downloadable.py +++ b/streamrip/downloadable.py @@ -201,7 +201,7 @@ class SoundcloudDownloadable(Downloadable): downloader = BasicDownloadable(self.session, self.url, "flac") await downloader.download(path, callback) engine = converter.FLAC(path) - engine.convert(path) + await engine.convert(path) async def _download_mp3(self, path: str, callback): async with self.session.get(self.url) as resp: @@ -230,6 +230,14 @@ class SoundcloudDownloadable(Downloadable): await file.write(content) return tmp + async def size(self) -> int: + async with self.session.get(self.url) as resp: + content = await resp.text("utf-8") + + parsed_m3u = m3u8.loads(content) + self._size = len(parsed_m3u.segments) + return await super().size() + def concat_audio_files(paths: list[str], out: str, ext: str, max_files_open=128): """Concatenate audio files using FFmpeg. Batched by max files open. @@ -272,13 +280,15 @@ def concat_audio_files(paths: list[str], out: str, ext: str, max_files_open=128) "-acodec", "copy", "-loglevel", - "panic", + "warning", outpaths[i], ), - # capture_output=True, + capture_output=True, ) if proc.returncode != 0: - raise Exception(f"FFMPEG returned with this error: {proc.stderr}") + raise Exception( + f"FFMPEG returned with status code {proc.returncode} error: {proc.stderr} output: {proc.stdout}" + ) # Recurse on remaining batches concat_audio_files(outpaths, out, ext) diff --git a/streamrip/main.py b/streamrip/main.py index d40f340..f096b02 100644 --- a/streamrip/main.py +++ b/streamrip/main.py @@ -1,6 +1,7 @@ import asyncio import logging +from .artwork import remove_artwork_tempdirs from .client import Client from .config import Config from .console import console @@ -8,6 +9,7 @@ from .media import Media, Pending from .progress import clear_progress from .prompter import get_prompter from .qobuz_client import QobuzClient +from .soundcloud_client import SoundcloudClient from .universal_url import parse_url logger = logging.getLogger("streamrip") @@ -29,17 +31,17 @@ class Main: # -> downloaded audio file self.pending: list[Pending] = [] self.media: list[Media] = [] - self.config = config self.clients: dict[str, Client] = { "qobuz": QobuzClient(config), # "tidal": TidalClient(config), # "deezer": DeezerClient(config), - # "soundcloud": SoundcloudClient(config), + "soundcloud": SoundcloudClient(config), # "deezloader": DeezloaderClient(config), } async def add(self, url: str): + """Add url as a pending item. Do not `asyncio.gather` calls to this!""" parsed = parse_url(url) if parsed is None: raise Exception(f"Unable to parse url {url}") @@ -60,6 +62,7 @@ class Main: with console.status(f"[cyan]Logging into {source}", spinner="dots"): # Log into client using credentials from config await client.login() + # await client.login() assert client.logged_in return client @@ -76,6 +79,8 @@ class Main: await asyncio.gather(*[item.rip() for item in self.media]) for client in self.clients.values(): - await client.session.close() + if hasattr(client, "session"): + await client.session.close() clear_progress() + remove_artwork_tempdirs() diff --git a/streamrip/metadata.py b/streamrip/metadata.py deleted file mode 100644 index c95383b..0000000 --- a/streamrip/metadata.py +++ /dev/null @@ -1,420 +0,0 @@ -"""Manages the information that will be embeded in the audio file.""" -from __future__ import annotations - -import json -import logging -import re -from dataclasses import dataclass -from typing import Optional, Type, TypeVar - -logger = logging.getLogger("streamrip") - - -def get_album_track_ids(source: str, resp) -> list[str]: - tracklist = resp["tracks"] - if source == "qobuz": - tracklist = tracklist["items"] - return [track["id"] for track in tracklist] - - -class Covers: - CoverEntry = tuple[str, str | None, str | None] - _covers: list[CoverEntry] - - def __init__(self): - # ordered from largest to smallest - self._covers = [ - ("original", None, None), - ("large", None, None), - ("small", None, None), - ("thumbnail", None, None), - ] - - def set_cover(self, size: str, url: str | None, path: str | None): - i = self._indexof(size) - self._covers[i] = (size, url, path) - - def set_cover_url(self, size: str, url: str): - self.set_cover(size, url, None) - - @staticmethod - def _indexof(size: str) -> int: - if size == "original": - return 0 - if size == "large": - return 1 - if size == "small": - return 2 - if size == "thumbnail": - return 3 - raise Exception(f"Invalid {size = }") - - def empty(self) -> bool: - return all(url is None for _, url, _ in self._covers) - - def set_largest_path(self, path: str): - for size, url, _ in self._covers: - if url is not None: - self.set_cover(size, url, path) - return - raise Exception(f"No covers found in {self}") - - def set_path(self, size: str, path: str): - i = self._indexof(size) - size, url, _ = self._covers[i] - self._covers[i] = (size, url, path) - - def largest(self) -> CoverEntry: - for s, u, p in self._covers: - if u is not None: - return (s, u, p) - - raise Exception(f"No covers found in {self}") - - @classmethod - def from_qobuz(cls, resp): - img = resp["image"] - - c = cls() - c.set_cover_url("original", "org".join(img["large"].rsplit("600", 1))) - c.set_cover_url("large", img["large"]) - c.set_cover_url("small", img["small"]) - c.set_cover_url("thumbnail", img["thumbnail"]) - return c - - def get_size(self, size: str) -> CoverEntry: - i = self._indexof(size) - size, url, path = self._covers[i] - if url is not None: - return (size, url, path) - if i + 1 < len(self._covers): - for s, u, p in self._covers[i + 1 :]: - if u is not None: - return (s, u, p) - raise Exception(f"Cover not found for {size = }. Available: {self}") - - def __repr__(self): - covers = "\n".join(map(repr, self._covers)) - return f"Covers({covers})" - - -PHON_COPYRIGHT = "\u2117" -COPYRIGHT = "\u00a9" - - -@dataclass(slots=True) -class TrackMetadata: - info: TrackInfo - - title: str - album: AlbumMetadata - artist: str - tracknumber: int - discnumber: int - composer: Optional[str] - - @classmethod - def from_qobuz(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata: - with open("tests/qobuz_track_resp.json", "w") as f: - json.dump(resp, f) - title = typed(resp["title"].strip(), str) - - version = typed(resp.get("version"), str | None) - work = typed(resp.get("work"), str | None) - if version is not None and version not in title: - title = f"{title} ({version})" - if work is not None and work not in title: - title = f"{work}: {title}" - - composer = typed(resp.get("composer", {}).get("name"), str | None) - tracknumber = typed(resp.get("track_number", 1), int) - discnumber = typed(resp.get("media_number", 1), int) - artist = typed(safe_get(resp, "performer", "name"), str) - track_id = str(resp["id"]) - bit_depth = typed(resp.get("maximum_bit_depth"), int | None) - sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None) - # Is the info included? - explicit = False - - info = TrackInfo( - id=track_id, - quality=album.info.quality, - bit_depth=bit_depth, - explicit=explicit, - sampling_rate=sampling_rate, - work=work, - ) - return cls( - info=info, - title=title, - album=album, - artist=artist, - tracknumber=tracknumber, - discnumber=discnumber, - composer=composer, - ) - - @classmethod - def from_deezer(cls, album: AlbumMetadata, resp) -> TrackMetadata: - raise NotImplemented - - @classmethod - def from_soundcloud(cls, album: AlbumMetadata, resp) -> TrackMetadata: - raise NotImplemented - - @classmethod - def from_tidal(cls, album: AlbumMetadata, resp) -> TrackMetadata: - raise NotImplemented - - @classmethod - def from_resp(cls, album: AlbumMetadata, source, resp) -> TrackMetadata: - if source == "qobuz": - return cls.from_qobuz(album, resp) - if source == "tidal": - return cls.from_tidal(album, resp) - if source == "soundcloud": - return cls.from_soundcloud(album, resp) - if source == "deezer": - return cls.from_deezer(album, resp) - raise Exception - - def format_track_path(self, format_string: str) -> str: - # Available keys: "tracknumber", "artist", "albumartist", "composer", "title", - # and "explicit", "albumcomposer" - none_text = "Unknown" - info = { - "title": self.title, - "tracknumber": self.tracknumber, - "artist": self.artist, - "albumartist": self.album.albumartist, - "albumcomposer": self.album.albumcomposer or none_text, - "composer": self.composer or none_text, - "explicit": " (Explicit) " if self.info.explicit else "", - } - return format_string.format(**info) - - -@dataclass(slots=True) -class TrackInfo: - id: str - quality: int - - bit_depth: Optional[int] = None - explicit: bool = False - sampling_rate: Optional[int | float] = None - work: Optional[str] = None - - -genre_clean = re.compile(r"([^\u2192\/]+)") - - -@dataclass(slots=True) -class AlbumMetadata: - info: AlbumInfo - - album: str - albumartist: str - year: str - genre: list[str] - covers: Covers - tracktotal: int - - disctotal: int = 1 - albumcomposer: Optional[str] = None - comment: Optional[str] = None - compilation: Optional[str] = None - copyright: Optional[str] = None - date: Optional[str] = None - description: Optional[str] = None - encoder: Optional[str] = None - grouping: Optional[str] = None - lyrics: Optional[str] = None - purchase_date: Optional[str] = None - - def get_genres(self) -> str: - return ", ".join(self.genre) - - def get_copyright(self) -> str | None: - if self.copyright is None: - return None - # Add special chars - _copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self.copyright) - _copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, _copyright) - return _copyright - - def format_folder_path(self, formatter: str) -> str: - # Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate", - # "id", and "albumcomposer", - none_str = "Unknown" - info: dict[str, str | int | float] = { - "albumartist": self.albumartist, - "albumcomposer": self.albumcomposer or none_str, - "bit_depth": self.info.bit_depth or none_str, - "id": self.info.id, - "sampling_rate": self.info.sampling_rate or none_str, - "title": self.album, - "year": self.year, - "container": self.info.container, - } - return formatter.format(**info) - - @classmethod - def from_qobuz(cls, resp: dict) -> AlbumMetadata: - album = resp.get("title", "Unknown Album") - tracktotal = resp.get("tracks_count", 1) - genre = resp.get("genres_list") or resp.get("genre") or [] - genres = list(set(genre_clean.findall("/".join(genre)))) - date = resp.get("release_date_original") or resp.get("release_date") - year = date[:4] if date is not None else "Unknown" - - _copyright = resp.get("copyright", "") - - if artists := resp.get("artists"): - albumartist = ", ".join(a["name"] for a in artists) - else: - albumartist = typed(safe_get(resp, "artist", "name"), str) - - albumcomposer = typed(safe_get(resp, "composer", "name"), str | None) - _label = resp.get("label") - if isinstance(_label, dict): - _label = _label["name"] - label = typed(_label, str | None) - description = typed(resp.get("description") or None, str | None) - disctotal = typed( - max( - track.get("media_number", 1) - for track in safe_get(resp, "tracks", "items", default=[{}]) # type: ignore - ) - or 1, - int, - ) - explicit = typed(resp.get("parental_warning", False), bool) - - # Non-embedded information - # version = resp.get("version") - cover_urls = Covers.from_qobuz(resp) - streamable = typed(resp.get("streamable", False), bool) - assert streamable - bit_depth = typed(resp.get("maximum_bit_depth"), int | None) - sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None) - quality = get_quality_id(bit_depth, sampling_rate) - # Make sure it is non-empty list - booklets = typed(resp.get("goodies", None) or None, list | None) - item_id = str(resp.get("qobuz_id")) - - if sampling_rate and bit_depth: - container = "FLAC" - else: - container = "MP3" - - info = AlbumInfo( - id=item_id, - quality=quality, - container=container, - label=label, - explicit=explicit, - sampling_rate=sampling_rate, - bit_depth=bit_depth, - booklets=booklets, - ) - return AlbumMetadata( - info, - album, - albumartist, - year, - genre=genres, - covers=cover_urls, - albumcomposer=albumcomposer, - comment=None, - compilation=None, - copyright=_copyright, - date=date, - description=description, - disctotal=disctotal, - encoder=None, - grouping=None, - lyrics=None, - purchase_date=None, - tracktotal=tracktotal, - ) - - @classmethod - def from_deezer(cls, resp) -> AlbumMetadata: - raise NotImplementedError - - @classmethod - def from_soundcloud(cls, resp) -> AlbumMetadata: - raise NotImplementedError - - @classmethod - def from_tidal(cls, resp) -> AlbumMetadata: - raise NotImplementedError - - @classmethod - def from_resp(cls, resp: dict, source: str) -> AlbumMetadata: - if source == "qobuz": - return cls.from_qobuz(resp) - if source == "tidal": - return cls.from_tidal(resp) - if source == "soundcloud": - return cls.from_soundcloud(resp) - if source == "deezer": - return cls.from_deezer(resp) - raise Exception("Invalid source") - - -@dataclass(slots=True) -class AlbumInfo: - id: str - quality: int - container: str - label: Optional[str] = None - explicit: bool = False - sampling_rate: Optional[int | float] = None - bit_depth: Optional[int] = None - booklets: list[dict] | None = None - - -def safe_get(d: dict, *keys, default=None) -> dict | str | int | list | None: - """Nested __getitem__ calls with a default value. - - Use to avoid key not found errors. - """ - _d = d - for k in keys: - _d = _d.get(k, {}) - if _d == {}: - return default - return _d - - -T = TypeVar("T") - - -def typed(thing, expected_type: Type[T]) -> T: - assert isinstance(thing, expected_type) - return thing - - -def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]) -> int: - """Get the universal quality id from bit depth and sampling rate. - - :param bit_depth: - :type bit_depth: Optional[int] - :param sampling_rate: In kHz - :type sampling_rate: Optional[int] - """ - # XXX: Should `0` quality be supported? - if bit_depth is None or sampling_rate is None: # is lossy - return 1 - - if bit_depth == 16: - return 2 - - if bit_depth == 24: - if sampling_rate <= 96: - return 3 - - return 4 - - raise Exception(f"Invalid {bit_depth = }") diff --git a/streamrip/metadata/__init__.py b/streamrip/metadata/__init__.py new file mode 100644 index 0000000..fc537aa --- /dev/null +++ b/streamrip/metadata/__init__.py @@ -0,0 +1,14 @@ +"""Manages the information that will be embeded in the audio file.""" +from . import util +from .album_metadata import AlbumMetadata +from .covers import Covers +from .playlist_metadata import PlaylistMetadata +from .track_metadata import TrackMetadata + +__all__ = [ + "AlbumMetadata", + "TrackMetadata", + "PlaylistMetadata", + "Covers", + "util", +] diff --git a/streamrip/metadata/album_metadata.py b/streamrip/metadata/album_metadata.py new file mode 100644 index 0000000..1b1dce9 --- /dev/null +++ b/streamrip/metadata/album_metadata.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from typing import Optional + +from .covers import Covers +from .util import get_quality_id, safe_get, typed + +PHON_COPYRIGHT = "\u2117" +COPYRIGHT = "\u00a9" + +logger = logging.getLogger("streamrip") + + +genre_clean = re.compile(r"([^\u2192\/]+)") + + +@dataclass(slots=True) +class AlbumInfo: + id: str + quality: int + container: str + label: Optional[str] = None + explicit: bool = False + sampling_rate: Optional[int | float] = None + bit_depth: Optional[int] = None + booklets: list[dict] | None = None + + +@dataclass(slots=True) +class AlbumMetadata: + info: AlbumInfo + + album: str + albumartist: str + year: str + genre: list[str] + covers: Covers + tracktotal: int + + disctotal: int = 1 + albumcomposer: Optional[str] = None + comment: Optional[str] = None + compilation: Optional[str] = None + copyright: Optional[str] = None + date: Optional[str] = None + description: Optional[str] = None + encoder: Optional[str] = None + grouping: Optional[str] = None + lyrics: Optional[str] = None + purchase_date: Optional[str] = None + + def get_genres(self) -> str: + return ", ".join(self.genre) + + def get_copyright(self) -> str | None: + if self.copyright is None: + return None + # Add special chars + _copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self.copyright) + _copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, _copyright) + return _copyright + + def format_folder_path(self, formatter: str) -> str: + # Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate", + # "id", and "albumcomposer", + none_str = "Unknown" + info: dict[str, str | int | float] = { + "albumartist": self.albumartist, + "albumcomposer": self.albumcomposer or none_str, + "bit_depth": self.info.bit_depth or none_str, + "id": self.info.id, + "sampling_rate": self.info.sampling_rate or none_str, + "title": self.album, + "year": self.year, + "container": self.info.container, + } + return formatter.format(**info) + + @classmethod + def from_qobuz(cls, resp: dict) -> AlbumMetadata: + album = resp.get("title", "Unknown Album") + tracktotal = resp.get("tracks_count", 1) + genre = resp.get("genres_list") or resp.get("genre") or [] + genres = list(set(genre_clean.findall("/".join(genre)))) + date = resp.get("release_date_original") or resp.get("release_date") + year = date[:4] if date is not None else "Unknown" + + _copyright = resp.get("copyright", "") + + if artists := resp.get("artists"): + albumartist = ", ".join(a["name"] for a in artists) + else: + albumartist = typed(safe_get(resp, "artist", "name"), str) + + albumcomposer = typed(safe_get(resp, "composer", "name"), str | None) + _label = resp.get("label") + if isinstance(_label, dict): + _label = _label["name"] + label = typed(_label, str | None) + description = typed(resp.get("description") or None, str | None) + disctotal = typed( + max( + track.get("media_number", 1) + for track in safe_get(resp, "tracks", "items", default=[{}]) # type: ignore + ) + or 1, + int, + ) + explicit = typed(resp.get("parental_warning", False), bool) + + # Non-embedded information + # version = resp.get("version") + cover_urls = Covers.from_qobuz(resp) + streamable = typed(resp.get("streamable", False), bool) + assert streamable + bit_depth = typed(resp.get("maximum_bit_depth"), int | None) + sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None) + quality = get_quality_id(bit_depth, sampling_rate) + # Make sure it is non-empty list + booklets = typed(resp.get("goodies", None) or None, list | None) + item_id = str(resp.get("qobuz_id")) + + if sampling_rate and bit_depth: + container = "FLAC" + else: + container = "MP3" + + info = AlbumInfo( + id=item_id, + quality=quality, + container=container, + label=label, + explicit=explicit, + sampling_rate=sampling_rate, + bit_depth=bit_depth, + booklets=booklets, + ) + return AlbumMetadata( + info, + album, + albumartist, + year, + genre=genres, + covers=cover_urls, + albumcomposer=albumcomposer, + comment=None, + compilation=None, + copyright=_copyright, + date=date, + description=description, + disctotal=disctotal, + encoder=None, + grouping=None, + lyrics=None, + purchase_date=None, + tracktotal=tracktotal, + ) + + @classmethod + def from_deezer(cls, resp) -> AlbumMetadata: + raise NotImplementedError + + @classmethod + def from_soundcloud(cls, resp) -> AlbumMetadata: + track = resp + logger.debug(track) + track_id = track["id"] + bit_depth, sampling_rate = None, None + explicit = typed( + safe_get(track, "publisher_metadata", "explicit", default=False), bool + ) + genre = typed(track["genre"], str) + artist = typed(safe_get(track, "publisher_metadata", "artist"), str | None) + artist = artist or typed(track["user"]["username"], str) + albumartist = artist + date = typed(track["created_at"], str) + year = date[:4] + label = typed(track["label_name"], str | None) + description = typed(track.get("description"), str | None) + album_title = typed( + safe_get(track, "publisher_metadata", "album_title"), str | None + ) + album_title = album_title or "Unknown album" + copyright = typed(safe_get(track, "publisher_metadata", "p_line"), str | None) + tracktotal = 1 + disctotal = 1 + quality = 0 + covers = Covers.from_soundcloud(resp) + + info = AlbumInfo( + # There are no albums in soundcloud, so we just identify them by a track ID + id=track_id, + quality=quality, + container="MP3", + label=label, + explicit=explicit, + sampling_rate=sampling_rate, + bit_depth=bit_depth, + booklets=None, + ) + return AlbumMetadata( + info, + album_title, + albumartist, + year, + genre=[genre], + covers=covers, + albumcomposer=None, + comment=None, + compilation=None, + copyright=copyright, + date=date, + description=description, + disctotal=disctotal, + encoder=None, + grouping=None, + lyrics=None, + purchase_date=None, + tracktotal=tracktotal, + ) + + @classmethod + def from_tidal(cls, resp) -> AlbumMetadata: + raise NotImplementedError + + @classmethod + def from_resp(cls, resp: dict, source: str) -> AlbumMetadata: + if source == "qobuz": + return cls.from_qobuz(resp["album"]) + if source == "tidal": + return cls.from_tidal(resp["album"]) + if source == "soundcloud": + return cls.from_soundcloud(resp) + if source == "deezer": + return cls.from_deezer(resp["album"]) + raise Exception("Invalid source") diff --git a/streamrip/metadata/covers.py b/streamrip/metadata/covers.py new file mode 100644 index 0000000..a70c6bc --- /dev/null +++ b/streamrip/metadata/covers.py @@ -0,0 +1,116 @@ +class Covers: + COVER_SIZES = ("thumbnail", "small", "large", "original") + CoverEntry = tuple[str, str | None, str | None] + _covers: list[CoverEntry] + + def __init__(self): + # ordered from largest to smallest + self._covers = [ + ("original", None, None), + ("large", None, None), + ("small", None, None), + ("thumbnail", None, None), + ] + + def set_cover(self, size: str, url: str | None, path: str | None): + i = self._indexof(size) + self._covers[i] = (size, url, path) + + def set_cover_url(self, size: str, url: str): + self.set_cover(size, url, None) + + @staticmethod + def _indexof(size: str) -> int: + if size == "original": + return 0 + if size == "large": + return 1 + if size == "small": + return 2 + if size == "thumbnail": + return 3 + raise Exception(f"Invalid {size = }") + + def empty(self) -> bool: + return all(url is None for _, url, _ in self._covers) + + def set_largest_path(self, path: str): + for size, url, _ in self._covers: + if url is not None: + self.set_cover(size, url, path) + return + raise Exception(f"No covers found in {self}") + + def set_path(self, size: str, path: str): + i = self._indexof(size) + size, url, _ = self._covers[i] + self._covers[i] = (size, url, path) + + def largest(self) -> CoverEntry: + for s, u, p in self._covers: + if u is not None: + return (s, u, p) + + raise Exception(f"No covers found in {self}") + + @classmethod + def from_qobuz(cls, resp): + img = resp["image"] + + c = cls() + c.set_cover_url("original", "org".join(img["large"].rsplit("600", 1))) + c.set_cover_url("large", img["large"]) + c.set_cover_url("small", img["small"]) + c.set_cover_url("thumbnail", img["thumbnail"]) + return c + + @classmethod + def from_soundcloud(cls, resp): + c = cls() + cover_url = (resp["artwork_url"] or resp["user"].get("avatar_url")).replace( + "large", "t500x500" + ) + c.set_cover_url("large", cover_url) + return c + + @classmethod + def from_tidal(cls, resp): + uuid = resp["cover"] + if not uuid: + return None + + c = cls() + for size_name, dimension in zip(cls.COVER_SIZES, (160, 320, 640, 1280)): + c.set_cover_url(size_name, cls._get_tidal_cover_url(uuid, dimension)) + return c + + def get_size(self, size: str) -> CoverEntry: + i = self._indexof(size) + size, url, path = self._covers[i] + if url is not None: + return (size, url, path) + if i + 1 < len(self._covers): + for s, u, p in self._covers[i + 1 :]: + if u is not None: + return (s, u, p) + raise Exception(f"Cover not found for {size = }. Available: {self}") + + @staticmethod + def _get_tidal_cover_url(uuid, size): + """Generate a tidal cover url. + + :param uuid: VALID uuid string + :param size: + """ + TIDAL_COVER_URL = ( + "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" + ) + possibles = (80, 160, 320, 640, 1280) + assert size in possibles, f"size must be in {possibles}" + return TIDAL_COVER_URL.format( + uuid=uuid.replace("-", "/"), height=size, width=size + ) + + def __repr__(self): + covers = "\n".join(map(repr, self._covers)) + return f"Covers({covers})" diff --git a/streamrip/metadata/playlist_metadata.py b/streamrip/metadata/playlist_metadata.py new file mode 100644 index 0000000..96d9258 --- /dev/null +++ b/streamrip/metadata/playlist_metadata.py @@ -0,0 +1,85 @@ +from dataclasses import dataclass + +from .album_metadata import AlbumMetadata +from .track_metadata import TrackMetadata +from .util import typed + +NON_STREAMABLE = "_non_streamable" +ORIGINAL_DOWNLOAD = "_original_download" +NOT_RESOLVED = "_not_resolved" + + +def get_soundcloud_id(resp: dict) -> str: + item_id = resp["id"] + if "media" not in resp: + return f"{item_id}|{NOT_RESOLVED}" + + if not resp["streamable"] or resp["policy"] == "BLOCK": + return f"{item_id}|{NON_STREAMABLE}" + + if resp["downloadable"] and resp["has_downloads_left"]: + return f"{item_id}|{ORIGINAL_DOWNLOAD}" + + url = None + for tc in resp["media"]["transcodings"]: + fmt = tc["format"] + if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": + url = tc["url"] + break + + assert url is not None + return f"{item_id}|{url}" + + +def parse_soundcloud_id(item_id: str) -> tuple[str, str]: + info = item_id.split("|") + assert len(info) == 2 + return tuple(info) + + +@dataclass(slots=True) +class PlaylistMetadata: + name: str + tracks: list[TrackMetadata] + + @classmethod + def from_qobuz(cls, resp: dict): + name = typed(resp["title"], str) + tracks = [ + TrackMetadata.from_qobuz(AlbumMetadata.from_qobuz(track["album"]), track) + for track in resp["tracks"]["items"] + ] + return cls(name, tracks) + + @classmethod + def from_soundcloud(cls, resp: dict): + """Convert a (modified) soundcloud API response to PlaylistMetadata. + + Args: + resp (dict): The response, except there should not be any partially resolved items + in the playlist. + + e.g. If soundcloud only returns the full metadata of 5 of them, the rest of the + elements in resp['tracks'] should be replaced with their full metadata. + + Returns: + PlaylistMetadata object. + """ + name = typed(resp["title"], str) + tracks = [ + TrackMetadata.from_soundcloud(AlbumMetadata.from_soundcloud(track), track) + for track in resp["tracks"] + ] + return cls(name, tracks) + + def ids(self): + return [track.info.id for track in self.tracks] + + @classmethod + def from_resp(cls, resp: dict, source: str): + if source == "qobuz": + return cls.from_qobuz(resp) + elif source == "soundcloud": + return cls.from_soundcloud(resp) + else: + raise NotImplementedError(source) diff --git a/streamrip/metadata/track_metadata.py b/streamrip/metadata/track_metadata.py new file mode 100644 index 0000000..fb081db --- /dev/null +++ b/streamrip/metadata/track_metadata.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from .album_metadata import AlbumMetadata +from .util import safe_get, typed + + +@dataclass(slots=True) +class TrackInfo: + id: str + quality: int + + bit_depth: Optional[int] = None + explicit: bool = False + sampling_rate: Optional[int | float] = None + work: Optional[str] = None + + +@dataclass(slots=True) +class TrackMetadata: + info: TrackInfo + + title: str + album: AlbumMetadata + artist: str + tracknumber: int + discnumber: int + composer: str | None + + @classmethod + def from_qobuz(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata: + title = typed(resp["title"].strip(), str) + version = typed(resp.get("version"), str | None) + work = typed(resp.get("work"), str | None) + if version is not None and version not in title: + title = f"{title} ({version})" + if work is not None and work not in title: + title = f"{work}: {title}" + + composer = typed(resp.get("composer", {}).get("name"), str | None) + tracknumber = typed(resp.get("track_number", 1), int) + discnumber = typed(resp.get("media_number", 1), int) + artist = typed( + safe_get( + resp, + "performer", + "name", + ), + str, + ) + track_id = str(resp["id"]) + bit_depth = typed(resp.get("maximum_bit_depth"), int | None) + sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None) + # Is the info included? + explicit = False + + info = TrackInfo( + id=track_id, + quality=album.info.quality, + bit_depth=bit_depth, + explicit=explicit, + sampling_rate=sampling_rate, + work=work, + ) + return cls( + info=info, + title=title, + album=album, + artist=artist, + tracknumber=tracknumber, + discnumber=discnumber, + composer=composer, + ) + + @classmethod + def from_deezer(cls, album: AlbumMetadata, resp) -> TrackMetadata: + raise NotImplemented + + @classmethod + def from_soundcloud(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata: + track = resp + track_id = track["id"] + bit_depth, sampling_rate = None, None + explicit = typed( + safe_get(track, "publisher_metadata", "explicit", default=False), bool + ) + + title = typed(track["title"].strip(), str) + artist = typed(track["user"]["username"], str) + tracknumber = 1 + + info = TrackInfo( + id=track_id, + quality=album.info.quality, + bit_depth=bit_depth, + explicit=explicit, + sampling_rate=sampling_rate, + work=None, + ) + return cls( + info=info, + title=title, + album=album, + artist=artist, + tracknumber=tracknumber, + discnumber=0, + composer=None, + ) + + @classmethod + def from_tidal(cls, album: AlbumMetadata, resp) -> TrackMetadata: + raise NotImplemented + + @classmethod + def from_resp(cls, album: AlbumMetadata, source, resp) -> TrackMetadata: + if source == "qobuz": + return cls.from_qobuz(album, resp) + if source == "tidal": + return cls.from_tidal(album, resp) + if source == "soundcloud": + return cls.from_soundcloud(album, resp) + if source == "deezer": + return cls.from_deezer(album, resp) + raise Exception + + def format_track_path(self, format_string: str) -> str: + # Available keys: "tracknumber", "artist", "albumartist", "composer", "title", + # and "explicit", "albumcomposer" + none_text = "Unknown" + info = { + "title": self.title, + "tracknumber": self.tracknumber, + "artist": self.artist, + "albumartist": self.album.albumartist, + "albumcomposer": self.album.albumcomposer or none_text, + "composer": self.composer or none_text, + "explicit": " (Explicit) " if self.info.explicit else "", + } + return format_string.format(**info) diff --git a/streamrip/metadata/util.py b/streamrip/metadata/util.py new file mode 100644 index 0000000..3ea04f3 --- /dev/null +++ b/streamrip/metadata/util.py @@ -0,0 +1,51 @@ +import functools +from typing import Optional, Type, TypeVar + + +def get_album_track_ids(source: str, resp) -> list[str]: + tracklist = resp["tracks"] + if source == "qobuz": + tracklist = tracklist["items"] + return [track["id"] for track in tracklist] + + +def safe_get(dictionary, *keys, default=None): + return functools.reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + keys, + dictionary, + ) + + +T = TypeVar("T") + + +def typed(thing, expected_type: Type[T]) -> T: + assert isinstance(thing, expected_type) + return thing + + +def get_quality_id( + bit_depth: Optional[int], sampling_rate: Optional[int | float] +) -> int: + """Get the universal quality id from bit depth and sampling rate. + + :param bit_depth: + :type bit_depth: Optional[int] + :param sampling_rate: In kHz + :type sampling_rate: Optional[int] + """ + # XXX: Should `0` quality be supported? + if bit_depth is None or sampling_rate is None: # is lossy + return 1 + + if bit_depth == 16: + return 2 + + if bit_depth == 24: + if sampling_rate <= 96: + return 3 + + return 4 + + raise Exception(f"Invalid {bit_depth = }") diff --git a/streamrip/playlist.py b/streamrip/playlist.py index 9cbe7bb..b212f06 100644 --- a/streamrip/playlist.py +++ b/streamrip/playlist.py @@ -1,13 +1,84 @@ +import asyncio +import logging +import os from dataclasses import dataclass +from .artwork import download_artwork +from .client import Client +from .config import Config +from .filepath_utils import clean_filename from .media import Media, Pending +from .metadata import AlbumMetadata, Covers, PlaylistMetadata, TrackMetadata +from .track import Track + +logger = logging.getLogger("streamrip") + + +@dataclass(slots=True) +class PendingPlaylistTrack(Pending): + id: str + client: Client + config: Config + folder: str + + async def resolve(self) -> Track: + resp = await self.client.get_metadata(self.id, "track") + album = AlbumMetadata.from_resp(resp["album"], self.client.source) + meta = TrackMetadata.from_resp(album, self.client.source, resp) + quality = getattr(self.config.session, self.client.source).quality + assert isinstance(quality, int) + embedded_cover_path, downloadable = await asyncio.gather( + self._download_cover(album.covers, self.folder), + self.client.get_downloadable(self.id, quality), + ) + return Track(meta, downloadable, self.config, self.folder, embedded_cover_path) + + async def _download_cover(self, covers: Covers, folder: str) -> str | None: + embed_path, _ = await download_artwork( + self.client.session, + folder, + covers, + self.config.session.artwork, + for_playlist=True, + ) + return embed_path @dataclass(slots=True) class Playlist(Media): - pass + name: str + config: Config + client: Client + tracks: list[PendingPlaylistTrack] + + async def preprocess(self): + pass + + async def download(self): + async def _resolve_and_download(pending): + track = await pending.resolve() + await track.rip() + + await asyncio.gather(*[_resolve_and_download(p) for p in self.tracks]) + + async def postprocess(self): + pass @dataclass(slots=True) class PendingPlaylist(Pending): - pass + id: str + client: Client + config: Config + + async def resolve(self): + resp = await self.client.get_metadata(self.id, "playlist") + meta = PlaylistMetadata.from_resp(resp, self.client.source) + name = meta.name + parent = self.config.session.downloads.folder + folder = os.path.join(parent, clean_filename(name)) + tracks = [ + PendingPlaylistTrack(id, self.client, self.config, folder) + for id in meta.ids() + ] + return Playlist(name, self.config, self.client, tracks) diff --git a/streamrip/prompter.py b/streamrip/prompter.py index 89b9bc3..e3080dd 100644 --- a/streamrip/prompter.py +++ b/streamrip/prompter.py @@ -10,6 +10,7 @@ from .config import Config from .deezer_client import DeezerClient from .exceptions import AuthenticationError, MissingCredentials from .qobuz_client import QobuzClient +from .soundcloud_client import SoundcloudClient from .tidal_client import TidalClient @@ -195,15 +196,30 @@ class DeezerPrompter(CredentialPrompter): return client +class SoundcloudPrompter(CredentialPrompter): + def has_creds(self) -> bool: + return True + + async def prompt_and_login(self): + pass + + def save(self): + pass + + def type_check_client(self, client) -> SoundcloudClient: + assert isinstance(client, SoundcloudClient) + return client + + PROMPTERS = { - "qobuz": (QobuzPrompter, QobuzClient), - "deezer": (DeezerPrompter, QobuzClient), - "tidal": (TidalPrompter, QobuzClient), + "qobuz": QobuzPrompter, + "deezer": DeezerPrompter, + "tidal": TidalPrompter, + "soundcloud": SoundcloudPrompter, } def get_prompter(client: Client, config: Config) -> CredentialPrompter: """Return an instance of a prompter.""" - p, c = PROMPTERS[client.source] - assert isinstance(client, c) + p = PROMPTERS[client.source] return p(config, client) diff --git a/streamrip/qobuz_client.py b/streamrip/qobuz_client.py index 038c7f4..90efe55 100644 --- a/streamrip/qobuz_client.py +++ b/streamrip/qobuz_client.py @@ -54,6 +54,7 @@ class QobuzClient(Client): self.secret: Optional[str] = None async def login(self): + logger.info("Logging into qobuz") self.session = await self.get_session() c = self.config.session.qobuz if not c.email_or_userid or not c.password_or_token: diff --git a/streamrip/soundcloud_client.py b/streamrip/soundcloud_client.py index f79688e..f67059a 100644 --- a/streamrip/soundcloud_client.py +++ b/streamrip/soundcloud_client.py @@ -1,3 +1,6 @@ +import asyncio +import itertools +import logging import re from .client import Client @@ -8,11 +11,17 @@ from .exceptions import NonStreamable BASE = "https://api-v2.soundcloud.com" SOUNDCLOUD_USER_ID = "672320-86895-162383-801513" +logger = logging.getLogger("streamrip") + class SoundcloudClient(Client): source = "soundcloud" logged_in = False + NON_STREAMABLE = "_non_streamable" + ORIGINAL_DOWNLOAD = "_original_download" + NOT_RESOLVED = "_not_resolved" + def __init__(self, config: Config): self.global_config = config self.config = config.session.soundcloud @@ -23,44 +32,138 @@ class SoundcloudClient(Client): async def login(self): self.session = await self.get_session() client_id, app_version = self.config.client_id, self.config.app_version - if not client_id or not app_version or not self._announce(): + if not client_id or not app_version or not (await self._announce()): client_id, app_version = await self._refresh_tokens() + # update file and session configs and save to disk + cf = self.global_config.file.soundcloud + cs = self.global_config.session.soundcloud + cs.client_id = client_id + cs.app_version = app_version + cf.client_id = client_id + cf.app_version = app_version + self.global_config.file.set_modified() - # update file and session configs and save to disk - c = self.global_config.file.soundcloud - self.config.client_id = c.client_id = client_id - self.config.client_id = c.app_version = app_version - self.global_config.file.set_modified() + logger.debug(f"Current valid {client_id=} {app_version=}") + self.logged_in = True async def get_metadata(self, item_id: str, media_type: str) -> dict: - raise NotImplementedError + """Fetch metadata for an item in Soundcloud API. - async def get_downloadable(self, item: dict, _) -> SoundcloudDownloadable: - if not item["streamable"] or item["policy"] == "BLOCK": - raise NonStreamable(item) + Args: + item_id (str): Plain soundcloud item ID (e.g 1633786176) + media_type (str): track or playlist - if item["downloadable"] and item["has_downloads_left"]: - resp = await self._api_request(f"tracks/{item['id']}/download") - resp_json = await resp.json() + Returns: + API response. + """ + if media_type == "track": + return await self._get_track(item_id) + elif media_type == "playlist": + return await self._get_playlist(item_id) + else: + raise Exception(f"{media_type} not supported") + + async def _get_track(self, item_id: str): + resp, status = await self._api_request(f"tracks/{item_id}") + assert status == 200 + return resp + + async def _get_playlist(self, item_id: str): + original_resp, status = await self._api_request(f"playlists/{item_id}") + assert status == 200 + + unresolved_tracks = [ + track["id"] for track in original_resp["tracks"] if "media" not in track + ] + + if len(unresolved_tracks) == 0: + return original_resp + + MAX_BATCH_SIZE = 50 + + batches = batched(unresolved_tracks, MAX_BATCH_SIZE) + requests = [ + self._api_request( + "tracks", + params={"ids": ",".join(str(id) for id in filter_none(batch))}, + ) + for batch in batches + ] + + # (list of track metadata, status code) + responses: list[tuple[list, int]] = await asyncio.gather(*requests) + + assert all(status == 200 for _, status in responses) + + remaining_tracks = list(itertools.chain(*[resp for resp, _ in responses])) + + # Insert the new metadata into the original response + track_map: dict[str, dict] = {track["id"]: track for track in remaining_tracks} + for i, track in enumerate(original_resp["tracks"]): + if "media" in track: # track already has metadata + continue + this_track = track_map.get(track["id"]) + if this_track is None: + raise Exception(f"Requested {track['id']} but got no response") + original_resp["tracks"][i] = this_track + + # Overwrite all ids in playlist + for track in original_resp["tracks"]: + track["id"] = self._get_custom_id(track) + + return original_resp + + @classmethod + def _get_custom_id(cls, resp: dict) -> str: + item_id = resp["id"] + assert "media" in resp, f"track {resp} should be resolved" + + if not resp["streamable"] or resp["policy"] == "BLOCK": + return f"{item_id}|{cls.NON_STREAMABLE}" + + if resp["downloadable"] and resp["has_downloads_left"]: + return f"{item_id}|{cls.ORIGINAL_DOWNLOAD}" + + url = None + for tc in resp["media"]["transcodings"]: + fmt = tc["format"] + if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": + url = tc["url"] + break + + assert url is not None + return f"{item_id}|{url}" + + async def get_downloadable(self, item_info: str, _) -> SoundcloudDownloadable: + # We have `get_metadata` overwrite the "id" field so that it contains + # some extra information we need to download soundcloud tracks + + # item_id is the soundcloud ID of the track + # download_url is either the url that points to an mp3 download or "" + # if download_url == '_non_streamable' then we raise an exception + + infos: list[str] = item_info.split("|") + assert len(infos) == 2, infos + item_id, download_info = infos + + if download_info == self.NON_STREAMABLE: + raise NonStreamable(item_info) + + if download_info == self.ORIGINAL_DOWNLOAD: + resp_json, status = await self._api_request(f"tracks/{item_id}/download") + assert status == 200 return SoundcloudDownloadable( self.session, {"url": resp_json["redirectUri"], "type": "original"} ) - else: - url = None - for tc in item["media"]["transcodings"]: - fmt = tc["format"] - if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": - url = tc["url"] - break + if download_info == self.NOT_RESOLVED: + raise NotImplementedError(item_info) - assert url is not None - - resp = await self._request(url) - resp_json = await resp.json() - return SoundcloudDownloadable( - self.session, {"url": resp_json["url"], "type": "mp3"} - ) + # download_info contains mp3 stream url + resp_json, status = await self._request(download_info) + return SoundcloudDownloadable( + self.session, {"url": resp_json["url"], "type": "mp3"} + ) async def search( self, query: str, media_type: str, limit: int = 50, offset: int = 0 @@ -73,14 +176,29 @@ class SoundcloudClient(Client): "offset": offset, "linked_partitioning": "1", } - resp = await self._api_request(f"search/{media_type}s", params=params) - return await resp.json() + resp, status = await self._api_request(f"search/{media_type}s", params=params) + assert status == 200 + return resp async def _api_request(self, path, params=None, headers=None): url = f"{BASE}/{path}" return await self._request(url, params=params, headers=headers) - async def _request(self, url, params=None, headers=None): + async def _request(self, url, params=None, headers=None) -> tuple[dict, int]: + c = self.config + _params = { + "client_id": c.client_id, + "app_version": c.app_version, + "app_locale": "en", + } + if params is not None: + _params.update(params) + + logger.debug(f"Requesting {url} with {_params=}, {headers=}") + async with self.session.get(url, params=_params, headers=headers) as resp: + return await resp.json(), resp.status + + async def _request_body(self, url, params=None, headers=None): c = self.config _params = { "client_id": c.client_id, @@ -91,15 +209,17 @@ class SoundcloudClient(Client): _params.update(params) async with self.session.get(url, params=_params, headers=headers) as resp: - return resp + return await resp.content.read(), resp.status async def _resolve_url(self, url: str) -> dict: - resp = await self._api_request(f"resolve?url={url}") - return await resp.json() + resp, status = await self._api_request("resolve", params={"url": url}) + assert status == 200 + return resp async def _announce(self): - resp = await self._api_request("announcements") - return resp.status == 200 + url = f"{BASE}/announcements" + _, status = await self._request_body(url) + return status == 200 async def _refresh_tokens(self) -> tuple[str, str]: """Return a valid client_id, app_version pair.""" @@ -130,4 +250,14 @@ class SoundcloudClient(Client): assert client_id_match is not None client_id = client_id_match.group(1) + logger.debug(f"Refreshed soundcloud tokens as {client_id=} {app_version=}") return client_id, app_version + + +def batched(iterable, n, fillvalue=None): + args = [iter(iterable)] * n + return list(itertools.zip_longest(*args, fillvalue=fillvalue)) + + +def filter_none(iterable): + return (x for x in iterable if x is not None) diff --git a/streamrip/tagger.py b/streamrip/tagger.py index 8064984..4c095c0 100644 --- a/streamrip/tagger.py +++ b/streamrip/tagger.py @@ -115,7 +115,7 @@ class Container(Enum): # unreachable return [] - def _tag_flac(self, meta) -> list[tuple]: + def _tag_flac(self, meta: TrackMetadata) -> list[tuple]: out = [] for k, v in FLAC_KEY.items(): tag = self._attr_from_meta(meta, k) @@ -131,13 +131,13 @@ class Container(Enum): out.append((v, str(tag))) return out - def _tag_mp3(self, meta): + def _tag_mp3(self, meta: TrackMetadata): out = [] for k, v in MP3_KEY.items(): if k == "tracknumber": - text = f"{meta.tracknumber}/{meta.tracktotal}" + text = f"{meta.tracknumber}/{meta.album.tracktotal}" elif k == "discnumber": - text = f"{meta.discnumber}/{meta.disctotal}" + text = f"{meta.discnumber}/{meta.album.disctotal}" else: text = self._attr_from_meta(meta, k) @@ -145,13 +145,13 @@ class Container(Enum): out.append((v.__name__, v(encoding=3, text=text))) return out - def _tag_aac(self, meta): + def _tag_aac(self, meta: TrackMetadata): out = [] for k, v in MP4_KEY.items(): if k == "tracknumber": - text = [(meta.tracknumber, meta.tracktotal)] + text = [(meta.tracknumber, meta.album.tracktotal)] elif k == "discnumber": - text = [(meta.discnumber, meta.disctotal)] + text = [(meta.discnumber, meta.album.disctotal)] else: text = self._attr_from_meta(meta, k) diff --git a/streamrip/track.py b/streamrip/track.py index 0a4f9f3..6ba97b6 100644 --- a/streamrip/track.py +++ b/streamrip/track.py @@ -102,7 +102,7 @@ class PendingTrack(Pending): meta = TrackMetadata.from_resp(self.album, self.client.source, resp) quality = getattr(self.config.session, self.client.source).quality assert isinstance(quality, int) - downloadable = await self.client.get_downloadable({"id": self.id}, quality) + downloadable = await self.client.get_downloadable(self.id, quality) return Track(meta, downloadable, self.config, self.folder, self.cover_path) @@ -120,7 +120,9 @@ class PendingSingle(Pending): async def resolve(self) -> Track: resp = await self.client.get_metadata(self.id, "track") - album = AlbumMetadata.from_resp(resp["album"], self.client.source) + # Patch for soundcloud + # self.id = resp["id"] + album = AlbumMetadata.from_resp(resp, self.client.source) meta = TrackMetadata.from_resp(album, self.client.source, resp) quality = getattr(self.config.session, self.client.source).quality @@ -132,7 +134,7 @@ class PendingSingle(Pending): embedded_cover_path, downloadable = await asyncio.gather( self._download_cover(album.covers, folder), - self.client.get_downloadable({"id": self.id}, quality), + self.client.get_downloadable(self.id, quality), ) return Track(meta, downloadable, self.config, folder, embedded_cover_path) @@ -144,6 +146,10 @@ class PendingSingle(Pending): async def _download_cover(self, covers: Covers, folder: str) -> str | None: embed_path, _ = await download_artwork( - self.client.session, folder, covers, self.config.session.artwork + self.client.session, + folder, + covers, + self.config.session.artwork, + for_playlist=False, ) return embed_path diff --git a/streamrip/universal_url.py b/streamrip/universal_url.py index 15edb17..848d0d6 100644 --- a/streamrip/universal_url.py +++ b/streamrip/universal_url.py @@ -3,12 +3,12 @@ from __future__ import annotations import re from abc import ABC, abstractmethod -from click import secho - from .album import PendingAlbum from .client import Client from .config import Config from .media import Pending +from .playlist import PendingPlaylist +from .soundcloud_client import SoundcloudClient from .track import PendingSingle from .validation_regexps import ( DEEZER_DYNAMIC_LINK_REGEX, @@ -100,8 +100,29 @@ class DeezerDynamicURL(URL): pass -class SoundCloudURL(URL): - pass +class SoundcloudURL(URL): + source = "soundcloud" + + def __init__(self, url: str): + self.url = url + + async def into_pending(self, client: SoundcloudClient, config: Config) -> Pending: + resolved = await client._resolve_url(self.url) + media_type = resolved["kind"] + item_id = str(resolved["id"]) + if media_type == "track": + return PendingSingle(item_id, client, config) + elif media_type == "playlist": + return PendingPlaylist(item_id, client, config) + else: + raise NotImplementedError(media_type) + + @classmethod + def from_str(cls, url: str): + soundcloud_url = SOUNDCLOUD_URL_REGEX.match(url) + if soundcloud_url is None: + return None + return cls(soundcloud_url.group(0)) class LastFmURL(URL): @@ -109,10 +130,18 @@ class LastFmURL(URL): def parse_url(url: str) -> URL | None: + """Return a URL type given a url string. + + Args: + url (str): Url to parse + + Returns: A URL type, or None if nothing matched. + """ url = url.strip() parsed_urls: list[URL | None] = [ GenericURL.from_str(url), QobuzInterpreterURL.from_str(url), + SoundcloudURL.from_str(url), # TODO: the rest of the url types ] return next((u for u in parsed_urls if u is not None), None) @@ -121,8 +150,9 @@ def parse_url(url: str) -> URL | None: # TODO: recycle this class class UniversalURL: """ - >>> u = UniversalURL('https://sampleurl.com') - >>> pending = await u.into_pending_item() + >>> u = UniversalURL.from_str('https://sampleurl.com') + >>> if u is not None: + >>> pending = await u.into_pending_item() """ source: str