This commit is contained in:
Nathan Thomas 2023-11-17 20:40:46 -08:00
parent 89f76b7f58
commit 3640e4e70a
20 changed files with 1051 additions and 520 deletions

View file

@ -8,8 +8,9 @@ from .client import Client
from .config import Config from .config import Config
from .console import console from .console import console
from .media import Media, Pending from .media import Media, Pending
from .metadata import AlbumMetadata, get_album_track_ids from .metadata import AlbumMetadata
from .track import PendingTrack, Track from .metadata.util import get_album_track_ids
from .track import PendingTrack
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")
@ -25,7 +26,7 @@ class Album(Media):
async def preprocess(self): async def preprocess(self):
if self.config.session.cli.text_output: if self.config.session.cli.text_output:
console.print( console.print(
f"[cyan]Downloading {self.meta.album} by {self.meta.albumartist}" f"Downloading [cyan]{self.meta.album}[/cyan] by [cyan]{self.meta.albumartist}[/cyan]"
) )
async def download(self): async def download(self):
@ -53,11 +54,15 @@ class PendingAlbum(Pending):
album_folder = self._album_folder(folder, meta) album_folder = self._album_folder(folder, meta)
os.makedirs(album_folder, exist_ok=True) os.makedirs(album_folder, exist_ok=True)
embed_cover, _ = await download_artwork( embed_cover, _ = await download_artwork(
self.client.session, album_folder, meta.covers, self.config.session.artwork self.client.session,
album_folder,
meta.covers,
self.config.session.artwork,
for_playlist=False,
) )
pending_tracks = [ pending_tracks = [
PendingTrack( PendingTrack(
id=id, id,
album=meta, album=meta,
client=self.client, client=self.client,
config=self.config, config=self.config,

View file

@ -1,5 +1,7 @@
import asyncio import asyncio
import logging
import os import os
import shutil
import aiohttp import aiohttp
from PIL import Image from PIL import Image
@ -8,52 +10,82 @@ from .config import ArtworkConfig
from .downloadable import BasicDownloadable from .downloadable import BasicDownloadable
from .metadata import Covers from .metadata import Covers
_artwork_tempdirs: set[str] = set()
logger = logging.getLogger("streamrip")
def remove_artwork_tempdirs():
logger.debug("Removing dirs %s", _artwork_tempdirs)
for path in _artwork_tempdirs:
shutil.rmtree(path)
async def download_artwork( async def download_artwork(
session: aiohttp.ClientSession, folder: str, covers: Covers, config: ArtworkConfig session: aiohttp.ClientSession,
folder: str,
covers: Covers,
config: ArtworkConfig,
for_playlist: bool,
) -> tuple[str | None, str | None]: ) -> tuple[str | None, str | None]:
"""Download artwork, which may include a seperate file to keep. """Download artwork and update passed Covers object with filepaths.
Also updates the passed Covers object with downloaded filepaths.
Because it is a single, we will assume that none of the covers have already been If paths for the selected sizes already exist in `covers`, nothing will
downloaded, so existing paths in `covers` will be discarded and overwritten. be downloaded.
If `for_playlist` is set, it will not download hires cover art regardless
of the config setting.
Embedded artworks are put in a temporary directory under `folder` called
"__embed" that can be deleted once a playlist or album is done downloading.
Hi-res (saved) artworks are kept in `folder` as "cover.jpg".
Args: Args:
covers (Covers): The set of available covers. session (aiohttp.ClientSession):
folder (str):
covers (Covers):
config (ArtworkConfig):
for_playlist (bool): Set to disable saved hires covers.
Returns: Returns:
(path to embed cover, path to hires cover) (path to embedded artwork, path to hires artwork)
The path of the cover to embed, or None if there either is no artwork available or
if artwork embedding is turned off.
""" """
if (not config.save_artwork and not config.embed) or covers.empty(): save_artwork, embed = config.save_artwork, config.embed
if for_playlist:
save_artwork = False
if not (save_artwork or embed) or covers.empty():
# No need to download anything # No need to download anything
return None, None return None, None
downloadables = [] downloadables = []
saved_cover_path = None _, l_url, saved_cover_path = covers.largest()
if config.save_artwork: if saved_cover_path is None and save_artwork:
_, l_url, _ = covers.largest()
assert l_url is not None # won't be true unless covers is empty
saved_cover_path = os.path.join(folder, "cover.jpg") saved_cover_path = os.path.join(folder, "cover.jpg")
assert l_url is not None
downloadables.append( downloadables.append(
BasicDownloadable(session, l_url, "jpg").download( BasicDownloadable(session, l_url, "jpg").download(
saved_cover_path, lambda _: None saved_cover_path, lambda _: None
) )
) )
embed_cover_path = None _, embed_url, embed_cover_path = covers.get_size(config.embed_size)
if config.embed: if embed_cover_path is None and config.embed:
_, embed_url, _ = covers.get_size(config.embed_size)
assert embed_url is not None assert embed_url is not None
embed_cover_path = os.path.join(folder, "embed_cover.jpg") embed_dir = os.path.join(folder, "__artwork")
os.makedirs(embed_dir, exist_ok=True)
_artwork_tempdirs.add(embed_dir)
embed_cover_path = os.path.join(embed_dir, f"cover{hash(embed_url)}.jpg")
downloadables.append( downloadables.append(
BasicDownloadable(session, embed_url, "jpg").download( BasicDownloadable(session, embed_url, "jpg").download(
embed_cover_path, lambda _: None embed_cover_path, lambda _: None
) )
) )
if len(downloadables) == 0:
return embed_cover_path, saved_cover_path
await asyncio.gather(*downloadables) await asyncio.gather(*downloadables)
# Update `covers` to reflect the current download state # Update `covers` to reflect the current download state

View file

@ -7,8 +7,9 @@ from functools import wraps
import click import click
from click import secho from click import secho
from click_help_colors import HelpColorsGroup from click_help_colors import HelpColorsGroup # type: ignore
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.prompt import Confirm
from rich.traceback import install from rich.traceback import install
from .config import Config, set_user_defaults from .config import Config, set_user_defaults
@ -118,7 +119,8 @@ async def file(ctx, path):
with Config(config_path) as cfg: with Config(config_path) as cfg:
main = Main(cfg) main = Main(cfg)
with open(path) as f: with open(path) as f:
await asyncio.gather(*[main.add(url) for url in f]) for url in f:
await main.add(url)
await main.resolve() await main.resolve()
await main.rip() await main.rip()
@ -152,13 +154,10 @@ def config_reset(ctx, yes):
"""Reset the config file.""" """Reset the config file."""
config_path = ctx.obj["config_path"] config_path = ctx.obj["config_path"]
if not yes: if not yes:
echo_w( if not Confirm.ask(
f"Are you sure you want to reset the config file at {config_path}? [y/n] ", f"Are you sure you want to reset the config file at {config_path}?"
nl=False, ):
) console.print("[green]Reset aborted")
result = input()
if result.strip() != "y":
echo_i("Reset aborted.")
return return
shutil.copy(BLANK_CONFIG_PATH, config_path) shutil.copy(BLANK_CONFIG_PATH, config_path)

View file

@ -36,7 +36,7 @@ class Client(ABC):
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
async def get_downloadable(self, item: dict, quality: int) -> Downloadable: async def get_downloadable(self, item: str, quality: int) -> Downloadable:
raise NotImplementedError raise NotImplementedError
@staticmethod @staticmethod

View file

@ -201,7 +201,7 @@ class SoundcloudDownloadable(Downloadable):
downloader = BasicDownloadable(self.session, self.url, "flac") downloader = BasicDownloadable(self.session, self.url, "flac")
await downloader.download(path, callback) await downloader.download(path, callback)
engine = converter.FLAC(path) engine = converter.FLAC(path)
engine.convert(path) await engine.convert(path)
async def _download_mp3(self, path: str, callback): async def _download_mp3(self, path: str, callback):
async with self.session.get(self.url) as resp: async with self.session.get(self.url) as resp:
@ -230,6 +230,14 @@ class SoundcloudDownloadable(Downloadable):
await file.write(content) await file.write(content)
return tmp return tmp
async def size(self) -> int:
async with self.session.get(self.url) as resp:
content = await resp.text("utf-8")
parsed_m3u = m3u8.loads(content)
self._size = len(parsed_m3u.segments)
return await super().size()
def concat_audio_files(paths: list[str], out: str, ext: str, max_files_open=128): def concat_audio_files(paths: list[str], out: str, ext: str, max_files_open=128):
"""Concatenate audio files using FFmpeg. Batched by max files open. """Concatenate audio files using FFmpeg. Batched by max files open.
@ -272,13 +280,15 @@ def concat_audio_files(paths: list[str], out: str, ext: str, max_files_open=128)
"-acodec", "-acodec",
"copy", "copy",
"-loglevel", "-loglevel",
"panic", "warning",
outpaths[i], outpaths[i],
), ),
# capture_output=True, capture_output=True,
) )
if proc.returncode != 0: if proc.returncode != 0:
raise Exception(f"FFMPEG returned with this error: {proc.stderr}") raise Exception(
f"FFMPEG returned with status code {proc.returncode} error: {proc.stderr} output: {proc.stdout}"
)
# Recurse on remaining batches # Recurse on remaining batches
concat_audio_files(outpaths, out, ext) concat_audio_files(outpaths, out, ext)

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import logging import logging
from .artwork import remove_artwork_tempdirs
from .client import Client from .client import Client
from .config import Config from .config import Config
from .console import console from .console import console
@ -8,6 +9,7 @@ from .media import Media, Pending
from .progress import clear_progress from .progress import clear_progress
from .prompter import get_prompter from .prompter import get_prompter
from .qobuz_client import QobuzClient from .qobuz_client import QobuzClient
from .soundcloud_client import SoundcloudClient
from .universal_url import parse_url from .universal_url import parse_url
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")
@ -29,17 +31,17 @@ class Main:
# -> downloaded audio file # -> downloaded audio file
self.pending: list[Pending] = [] self.pending: list[Pending] = []
self.media: list[Media] = [] self.media: list[Media] = []
self.config = config self.config = config
self.clients: dict[str, Client] = { self.clients: dict[str, Client] = {
"qobuz": QobuzClient(config), "qobuz": QobuzClient(config),
# "tidal": TidalClient(config), # "tidal": TidalClient(config),
# "deezer": DeezerClient(config), # "deezer": DeezerClient(config),
# "soundcloud": SoundcloudClient(config), "soundcloud": SoundcloudClient(config),
# "deezloader": DeezloaderClient(config), # "deezloader": DeezloaderClient(config),
} }
async def add(self, url: str): async def add(self, url: str):
"""Add url as a pending item. Do not `asyncio.gather` calls to this!"""
parsed = parse_url(url) parsed = parse_url(url)
if parsed is None: if parsed is None:
raise Exception(f"Unable to parse url {url}") raise Exception(f"Unable to parse url {url}")
@ -60,6 +62,7 @@ class Main:
with console.status(f"[cyan]Logging into {source}", spinner="dots"): with console.status(f"[cyan]Logging into {source}", spinner="dots"):
# Log into client using credentials from config # Log into client using credentials from config
await client.login() await client.login()
# await client.login()
assert client.logged_in assert client.logged_in
return client return client
@ -76,6 +79,8 @@ class Main:
await asyncio.gather(*[item.rip() for item in self.media]) await asyncio.gather(*[item.rip() for item in self.media])
for client in self.clients.values(): for client in self.clients.values():
if hasattr(client, "session"):
await client.session.close() await client.session.close()
clear_progress() clear_progress()
remove_artwork_tempdirs()

View file

@ -1,420 +0,0 @@
"""Manages the information that will be embeded in the audio file."""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
from typing import Optional, Type, TypeVar
logger = logging.getLogger("streamrip")
def get_album_track_ids(source: str, resp) -> list[str]:
tracklist = resp["tracks"]
if source == "qobuz":
tracklist = tracklist["items"]
return [track["id"] for track in tracklist]
class Covers:
CoverEntry = tuple[str, str | None, str | None]
_covers: list[CoverEntry]
def __init__(self):
# ordered from largest to smallest
self._covers = [
("original", None, None),
("large", None, None),
("small", None, None),
("thumbnail", None, None),
]
def set_cover(self, size: str, url: str | None, path: str | None):
i = self._indexof(size)
self._covers[i] = (size, url, path)
def set_cover_url(self, size: str, url: str):
self.set_cover(size, url, None)
@staticmethod
def _indexof(size: str) -> int:
if size == "original":
return 0
if size == "large":
return 1
if size == "small":
return 2
if size == "thumbnail":
return 3
raise Exception(f"Invalid {size = }")
def empty(self) -> bool:
return all(url is None for _, url, _ in self._covers)
def set_largest_path(self, path: str):
for size, url, _ in self._covers:
if url is not None:
self.set_cover(size, url, path)
return
raise Exception(f"No covers found in {self}")
def set_path(self, size: str, path: str):
i = self._indexof(size)
size, url, _ = self._covers[i]
self._covers[i] = (size, url, path)
def largest(self) -> CoverEntry:
for s, u, p in self._covers:
if u is not None:
return (s, u, p)
raise Exception(f"No covers found in {self}")
@classmethod
def from_qobuz(cls, resp):
img = resp["image"]
c = cls()
c.set_cover_url("original", "org".join(img["large"].rsplit("600", 1)))
c.set_cover_url("large", img["large"])
c.set_cover_url("small", img["small"])
c.set_cover_url("thumbnail", img["thumbnail"])
return c
def get_size(self, size: str) -> CoverEntry:
i = self._indexof(size)
size, url, path = self._covers[i]
if url is not None:
return (size, url, path)
if i + 1 < len(self._covers):
for s, u, p in self._covers[i + 1 :]:
if u is not None:
return (s, u, p)
raise Exception(f"Cover not found for {size = }. Available: {self}")
def __repr__(self):
covers = "\n".join(map(repr, self._covers))
return f"Covers({covers})"
PHON_COPYRIGHT = "\u2117"
COPYRIGHT = "\u00a9"
@dataclass(slots=True)
class TrackMetadata:
info: TrackInfo
title: str
album: AlbumMetadata
artist: str
tracknumber: int
discnumber: int
composer: Optional[str]
@classmethod
def from_qobuz(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata:
with open("tests/qobuz_track_resp.json", "w") as f:
json.dump(resp, f)
title = typed(resp["title"].strip(), str)
version = typed(resp.get("version"), str | None)
work = typed(resp.get("work"), str | None)
if version is not None and version not in title:
title = f"{title} ({version})"
if work is not None and work not in title:
title = f"{work}: {title}"
composer = typed(resp.get("composer", {}).get("name"), str | None)
tracknumber = typed(resp.get("track_number", 1), int)
discnumber = typed(resp.get("media_number", 1), int)
artist = typed(safe_get(resp, "performer", "name"), str)
track_id = str(resp["id"])
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
# Is the info included?
explicit = False
info = TrackInfo(
id=track_id,
quality=album.info.quality,
bit_depth=bit_depth,
explicit=explicit,
sampling_rate=sampling_rate,
work=work,
)
return cls(
info=info,
title=title,
album=album,
artist=artist,
tracknumber=tracknumber,
discnumber=discnumber,
composer=composer,
)
@classmethod
def from_deezer(cls, album: AlbumMetadata, resp) -> TrackMetadata:
raise NotImplemented
@classmethod
def from_soundcloud(cls, album: AlbumMetadata, resp) -> TrackMetadata:
raise NotImplemented
@classmethod
def from_tidal(cls, album: AlbumMetadata, resp) -> TrackMetadata:
raise NotImplemented
@classmethod
def from_resp(cls, album: AlbumMetadata, source, resp) -> TrackMetadata:
if source == "qobuz":
return cls.from_qobuz(album, resp)
if source == "tidal":
return cls.from_tidal(album, resp)
if source == "soundcloud":
return cls.from_soundcloud(album, resp)
if source == "deezer":
return cls.from_deezer(album, resp)
raise Exception
def format_track_path(self, format_string: str) -> str:
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "explicit", "albumcomposer"
none_text = "Unknown"
info = {
"title": self.title,
"tracknumber": self.tracknumber,
"artist": self.artist,
"albumartist": self.album.albumartist,
"albumcomposer": self.album.albumcomposer or none_text,
"composer": self.composer or none_text,
"explicit": " (Explicit) " if self.info.explicit else "",
}
return format_string.format(**info)
@dataclass(slots=True)
class TrackInfo:
id: str
quality: int
bit_depth: Optional[int] = None
explicit: bool = False
sampling_rate: Optional[int | float] = None
work: Optional[str] = None
genre_clean = re.compile(r"([^\u2192\/]+)")
@dataclass(slots=True)
class AlbumMetadata:
info: AlbumInfo
album: str
albumartist: str
year: str
genre: list[str]
covers: Covers
tracktotal: int
disctotal: int = 1
albumcomposer: Optional[str] = None
comment: Optional[str] = None
compilation: Optional[str] = None
copyright: Optional[str] = None
date: Optional[str] = None
description: Optional[str] = None
encoder: Optional[str] = None
grouping: Optional[str] = None
lyrics: Optional[str] = None
purchase_date: Optional[str] = None
def get_genres(self) -> str:
return ", ".join(self.genre)
def get_copyright(self) -> str | None:
if self.copyright is None:
return None
# Add special chars
_copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self.copyright)
_copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, _copyright)
return _copyright
def format_folder_path(self, formatter: str) -> str:
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "id", and "albumcomposer",
none_str = "Unknown"
info: dict[str, str | int | float] = {
"albumartist": self.albumartist,
"albumcomposer": self.albumcomposer or none_str,
"bit_depth": self.info.bit_depth or none_str,
"id": self.info.id,
"sampling_rate": self.info.sampling_rate or none_str,
"title": self.album,
"year": self.year,
"container": self.info.container,
}
return formatter.format(**info)
@classmethod
def from_qobuz(cls, resp: dict) -> AlbumMetadata:
album = resp.get("title", "Unknown Album")
tracktotal = resp.get("tracks_count", 1)
genre = resp.get("genres_list") or resp.get("genre") or []
genres = list(set(genre_clean.findall("/".join(genre))))
date = resp.get("release_date_original") or resp.get("release_date")
year = date[:4] if date is not None else "Unknown"
_copyright = resp.get("copyright", "")
if artists := resp.get("artists"):
albumartist = ", ".join(a["name"] for a in artists)
else:
albumartist = typed(safe_get(resp, "artist", "name"), str)
albumcomposer = typed(safe_get(resp, "composer", "name"), str | None)
_label = resp.get("label")
if isinstance(_label, dict):
_label = _label["name"]
label = typed(_label, str | None)
description = typed(resp.get("description") or None, str | None)
disctotal = typed(
max(
track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}]) # type: ignore
)
or 1,
int,
)
explicit = typed(resp.get("parental_warning", False), bool)
# Non-embedded information
# version = resp.get("version")
cover_urls = Covers.from_qobuz(resp)
streamable = typed(resp.get("streamable", False), bool)
assert streamable
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
quality = get_quality_id(bit_depth, sampling_rate)
# Make sure it is non-empty list
booklets = typed(resp.get("goodies", None) or None, list | None)
item_id = str(resp.get("qobuz_id"))
if sampling_rate and bit_depth:
container = "FLAC"
else:
container = "MP3"
info = AlbumInfo(
id=item_id,
quality=quality,
container=container,
label=label,
explicit=explicit,
sampling_rate=sampling_rate,
bit_depth=bit_depth,
booklets=booklets,
)
return AlbumMetadata(
info,
album,
albumartist,
year,
genre=genres,
covers=cover_urls,
albumcomposer=albumcomposer,
comment=None,
compilation=None,
copyright=_copyright,
date=date,
description=description,
disctotal=disctotal,
encoder=None,
grouping=None,
lyrics=None,
purchase_date=None,
tracktotal=tracktotal,
)
@classmethod
def from_deezer(cls, resp) -> AlbumMetadata:
raise NotImplementedError
@classmethod
def from_soundcloud(cls, resp) -> AlbumMetadata:
raise NotImplementedError
@classmethod
def from_tidal(cls, resp) -> AlbumMetadata:
raise NotImplementedError
@classmethod
def from_resp(cls, resp: dict, source: str) -> AlbumMetadata:
if source == "qobuz":
return cls.from_qobuz(resp)
if source == "tidal":
return cls.from_tidal(resp)
if source == "soundcloud":
return cls.from_soundcloud(resp)
if source == "deezer":
return cls.from_deezer(resp)
raise Exception("Invalid source")
@dataclass(slots=True)
class AlbumInfo:
id: str
quality: int
container: str
label: Optional[str] = None
explicit: bool = False
sampling_rate: Optional[int | float] = None
bit_depth: Optional[int] = None
booklets: list[dict] | None = None
def safe_get(d: dict, *keys, default=None) -> dict | str | int | list | None:
"""Nested __getitem__ calls with a default value.
Use to avoid key not found errors.
"""
_d = d
for k in keys:
_d = _d.get(k, {})
if _d == {}:
return default
return _d
T = TypeVar("T")
def typed(thing, expected_type: Type[T]) -> T:
assert isinstance(thing, expected_type)
return thing
def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]) -> int:
"""Get the universal quality id from bit depth and sampling rate.
:param bit_depth:
:type bit_depth: Optional[int]
:param sampling_rate: In kHz
:type sampling_rate: Optional[int]
"""
# XXX: Should `0` quality be supported?
if bit_depth is None or sampling_rate is None: # is lossy
return 1
if bit_depth == 16:
return 2
if bit_depth == 24:
if sampling_rate <= 96:
return 3
return 4
raise Exception(f"Invalid {bit_depth = }")

View file

@ -0,0 +1,14 @@
"""Manages the information that will be embeded in the audio file."""
from . import util
from .album_metadata import AlbumMetadata
from .covers import Covers
from .playlist_metadata import PlaylistMetadata
from .track_metadata import TrackMetadata
__all__ = [
"AlbumMetadata",
"TrackMetadata",
"PlaylistMetadata",
"Covers",
"util",
]

View file

@ -0,0 +1,239 @@
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Optional
from .covers import Covers
from .util import get_quality_id, safe_get, typed
PHON_COPYRIGHT = "\u2117"
COPYRIGHT = "\u00a9"
logger = logging.getLogger("streamrip")
genre_clean = re.compile(r"([^\u2192\/]+)")
@dataclass(slots=True)
class AlbumInfo:
id: str
quality: int
container: str
label: Optional[str] = None
explicit: bool = False
sampling_rate: Optional[int | float] = None
bit_depth: Optional[int] = None
booklets: list[dict] | None = None
@dataclass(slots=True)
class AlbumMetadata:
info: AlbumInfo
album: str
albumartist: str
year: str
genre: list[str]
covers: Covers
tracktotal: int
disctotal: int = 1
albumcomposer: Optional[str] = None
comment: Optional[str] = None
compilation: Optional[str] = None
copyright: Optional[str] = None
date: Optional[str] = None
description: Optional[str] = None
encoder: Optional[str] = None
grouping: Optional[str] = None
lyrics: Optional[str] = None
purchase_date: Optional[str] = None
def get_genres(self) -> str:
return ", ".join(self.genre)
def get_copyright(self) -> str | None:
if self.copyright is None:
return None
# Add special chars
_copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self.copyright)
_copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, _copyright)
return _copyright
def format_folder_path(self, formatter: str) -> str:
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "id", and "albumcomposer",
none_str = "Unknown"
info: dict[str, str | int | float] = {
"albumartist": self.albumartist,
"albumcomposer": self.albumcomposer or none_str,
"bit_depth": self.info.bit_depth or none_str,
"id": self.info.id,
"sampling_rate": self.info.sampling_rate or none_str,
"title": self.album,
"year": self.year,
"container": self.info.container,
}
return formatter.format(**info)
@classmethod
def from_qobuz(cls, resp: dict) -> AlbumMetadata:
album = resp.get("title", "Unknown Album")
tracktotal = resp.get("tracks_count", 1)
genre = resp.get("genres_list") or resp.get("genre") or []
genres = list(set(genre_clean.findall("/".join(genre))))
date = resp.get("release_date_original") or resp.get("release_date")
year = date[:4] if date is not None else "Unknown"
_copyright = resp.get("copyright", "")
if artists := resp.get("artists"):
albumartist = ", ".join(a["name"] for a in artists)
else:
albumartist = typed(safe_get(resp, "artist", "name"), str)
albumcomposer = typed(safe_get(resp, "composer", "name"), str | None)
_label = resp.get("label")
if isinstance(_label, dict):
_label = _label["name"]
label = typed(_label, str | None)
description = typed(resp.get("description") or None, str | None)
disctotal = typed(
max(
track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}]) # type: ignore
)
or 1,
int,
)
explicit = typed(resp.get("parental_warning", False), bool)
# Non-embedded information
# version = resp.get("version")
cover_urls = Covers.from_qobuz(resp)
streamable = typed(resp.get("streamable", False), bool)
assert streamable
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
quality = get_quality_id(bit_depth, sampling_rate)
# Make sure it is non-empty list
booklets = typed(resp.get("goodies", None) or None, list | None)
item_id = str(resp.get("qobuz_id"))
if sampling_rate and bit_depth:
container = "FLAC"
else:
container = "MP3"
info = AlbumInfo(
id=item_id,
quality=quality,
container=container,
label=label,
explicit=explicit,
sampling_rate=sampling_rate,
bit_depth=bit_depth,
booklets=booklets,
)
return AlbumMetadata(
info,
album,
albumartist,
year,
genre=genres,
covers=cover_urls,
albumcomposer=albumcomposer,
comment=None,
compilation=None,
copyright=_copyright,
date=date,
description=description,
disctotal=disctotal,
encoder=None,
grouping=None,
lyrics=None,
purchase_date=None,
tracktotal=tracktotal,
)
@classmethod
def from_deezer(cls, resp) -> AlbumMetadata:
raise NotImplementedError
@classmethod
def from_soundcloud(cls, resp) -> AlbumMetadata:
track = resp
logger.debug(track)
track_id = track["id"]
bit_depth, sampling_rate = None, None
explicit = typed(
safe_get(track, "publisher_metadata", "explicit", default=False), bool
)
genre = typed(track["genre"], str)
artist = typed(safe_get(track, "publisher_metadata", "artist"), str | None)
artist = artist or typed(track["user"]["username"], str)
albumartist = artist
date = typed(track["created_at"], str)
year = date[:4]
label = typed(track["label_name"], str | None)
description = typed(track.get("description"), str | None)
album_title = typed(
safe_get(track, "publisher_metadata", "album_title"), str | None
)
album_title = album_title or "Unknown album"
copyright = typed(safe_get(track, "publisher_metadata", "p_line"), str | None)
tracktotal = 1
disctotal = 1
quality = 0
covers = Covers.from_soundcloud(resp)
info = AlbumInfo(
# There are no albums in soundcloud, so we just identify them by a track ID
id=track_id,
quality=quality,
container="MP3",
label=label,
explicit=explicit,
sampling_rate=sampling_rate,
bit_depth=bit_depth,
booklets=None,
)
return AlbumMetadata(
info,
album_title,
albumartist,
year,
genre=[genre],
covers=covers,
albumcomposer=None,
comment=None,
compilation=None,
copyright=copyright,
date=date,
description=description,
disctotal=disctotal,
encoder=None,
grouping=None,
lyrics=None,
purchase_date=None,
tracktotal=tracktotal,
)
@classmethod
def from_tidal(cls, resp) -> AlbumMetadata:
raise NotImplementedError
@classmethod
def from_resp(cls, resp: dict, source: str) -> AlbumMetadata:
if source == "qobuz":
return cls.from_qobuz(resp["album"])
if source == "tidal":
return cls.from_tidal(resp["album"])
if source == "soundcloud":
return cls.from_soundcloud(resp)
if source == "deezer":
return cls.from_deezer(resp["album"])
raise Exception("Invalid source")

View file

@ -0,0 +1,116 @@
class Covers:
COVER_SIZES = ("thumbnail", "small", "large", "original")
CoverEntry = tuple[str, str | None, str | None]
_covers: list[CoverEntry]
def __init__(self):
# ordered from largest to smallest
self._covers = [
("original", None, None),
("large", None, None),
("small", None, None),
("thumbnail", None, None),
]
def set_cover(self, size: str, url: str | None, path: str | None):
i = self._indexof(size)
self._covers[i] = (size, url, path)
def set_cover_url(self, size: str, url: str):
self.set_cover(size, url, None)
@staticmethod
def _indexof(size: str) -> int:
if size == "original":
return 0
if size == "large":
return 1
if size == "small":
return 2
if size == "thumbnail":
return 3
raise Exception(f"Invalid {size = }")
def empty(self) -> bool:
return all(url is None for _, url, _ in self._covers)
def set_largest_path(self, path: str):
for size, url, _ in self._covers:
if url is not None:
self.set_cover(size, url, path)
return
raise Exception(f"No covers found in {self}")
def set_path(self, size: str, path: str):
i = self._indexof(size)
size, url, _ = self._covers[i]
self._covers[i] = (size, url, path)
def largest(self) -> CoverEntry:
for s, u, p in self._covers:
if u is not None:
return (s, u, p)
raise Exception(f"No covers found in {self}")
@classmethod
def from_qobuz(cls, resp):
img = resp["image"]
c = cls()
c.set_cover_url("original", "org".join(img["large"].rsplit("600", 1)))
c.set_cover_url("large", img["large"])
c.set_cover_url("small", img["small"])
c.set_cover_url("thumbnail", img["thumbnail"])
return c
@classmethod
def from_soundcloud(cls, resp):
c = cls()
cover_url = (resp["artwork_url"] or resp["user"].get("avatar_url")).replace(
"large", "t500x500"
)
c.set_cover_url("large", cover_url)
return c
@classmethod
def from_tidal(cls, resp):
uuid = resp["cover"]
if not uuid:
return None
c = cls()
for size_name, dimension in zip(cls.COVER_SIZES, (160, 320, 640, 1280)):
c.set_cover_url(size_name, cls._get_tidal_cover_url(uuid, dimension))
return c
def get_size(self, size: str) -> CoverEntry:
i = self._indexof(size)
size, url, path = self._covers[i]
if url is not None:
return (size, url, path)
if i + 1 < len(self._covers):
for s, u, p in self._covers[i + 1 :]:
if u is not None:
return (s, u, p)
raise Exception(f"Cover not found for {size = }. Available: {self}")
@staticmethod
def _get_tidal_cover_url(uuid, size):
"""Generate a tidal cover url.
:param uuid: VALID uuid string
:param size:
"""
TIDAL_COVER_URL = (
"https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
)
possibles = (80, 160, 320, 640, 1280)
assert size in possibles, f"size must be in {possibles}"
return TIDAL_COVER_URL.format(
uuid=uuid.replace("-", "/"), height=size, width=size
)
def __repr__(self):
covers = "\n".join(map(repr, self._covers))
return f"Covers({covers})"

View file

@ -0,0 +1,85 @@
from dataclasses import dataclass
from .album_metadata import AlbumMetadata
from .track_metadata import TrackMetadata
from .util import typed
NON_STREAMABLE = "_non_streamable"
ORIGINAL_DOWNLOAD = "_original_download"
NOT_RESOLVED = "_not_resolved"
def get_soundcloud_id(resp: dict) -> str:
item_id = resp["id"]
if "media" not in resp:
return f"{item_id}|{NOT_RESOLVED}"
if not resp["streamable"] or resp["policy"] == "BLOCK":
return f"{item_id}|{NON_STREAMABLE}"
if resp["downloadable"] and resp["has_downloads_left"]:
return f"{item_id}|{ORIGINAL_DOWNLOAD}"
url = None
for tc in resp["media"]["transcodings"]:
fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
url = tc["url"]
break
assert url is not None
return f"{item_id}|{url}"
def parse_soundcloud_id(item_id: str) -> tuple[str, str]:
info = item_id.split("|")
assert len(info) == 2
return tuple(info)
@dataclass(slots=True)
class PlaylistMetadata:
name: str
tracks: list[TrackMetadata]
@classmethod
def from_qobuz(cls, resp: dict):
name = typed(resp["title"], str)
tracks = [
TrackMetadata.from_qobuz(AlbumMetadata.from_qobuz(track["album"]), track)
for track in resp["tracks"]["items"]
]
return cls(name, tracks)
@classmethod
def from_soundcloud(cls, resp: dict):
"""Convert a (modified) soundcloud API response to PlaylistMetadata.
Args:
resp (dict): The response, except there should not be any partially resolved items
in the playlist.
e.g. If soundcloud only returns the full metadata of 5 of them, the rest of the
elements in resp['tracks'] should be replaced with their full metadata.
Returns:
PlaylistMetadata object.
"""
name = typed(resp["title"], str)
tracks = [
TrackMetadata.from_soundcloud(AlbumMetadata.from_soundcloud(track), track)
for track in resp["tracks"]
]
return cls(name, tracks)
def ids(self):
return [track.info.id for track in self.tracks]
@classmethod
def from_resp(cls, resp: dict, source: str):
if source == "qobuz":
return cls.from_qobuz(resp)
elif source == "soundcloud":
return cls.from_soundcloud(resp)
else:
raise NotImplementedError(source)

View file

@ -0,0 +1,141 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from .album_metadata import AlbumMetadata
from .util import safe_get, typed
@dataclass(slots=True)
class TrackInfo:
id: str
quality: int
bit_depth: Optional[int] = None
explicit: bool = False
sampling_rate: Optional[int | float] = None
work: Optional[str] = None
@dataclass(slots=True)
class TrackMetadata:
info: TrackInfo
title: str
album: AlbumMetadata
artist: str
tracknumber: int
discnumber: int
composer: str | None
@classmethod
def from_qobuz(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata:
title = typed(resp["title"].strip(), str)
version = typed(resp.get("version"), str | None)
work = typed(resp.get("work"), str | None)
if version is not None and version not in title:
title = f"{title} ({version})"
if work is not None and work not in title:
title = f"{work}: {title}"
composer = typed(resp.get("composer", {}).get("name"), str | None)
tracknumber = typed(resp.get("track_number", 1), int)
discnumber = typed(resp.get("media_number", 1), int)
artist = typed(
safe_get(
resp,
"performer",
"name",
),
str,
)
track_id = str(resp["id"])
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
# Is the info included?
explicit = False
info = TrackInfo(
id=track_id,
quality=album.info.quality,
bit_depth=bit_depth,
explicit=explicit,
sampling_rate=sampling_rate,
work=work,
)
return cls(
info=info,
title=title,
album=album,
artist=artist,
tracknumber=tracknumber,
discnumber=discnumber,
composer=composer,
)
@classmethod
def from_deezer(cls, album: AlbumMetadata, resp) -> TrackMetadata:
raise NotImplemented
@classmethod
def from_soundcloud(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata:
track = resp
track_id = track["id"]
bit_depth, sampling_rate = None, None
explicit = typed(
safe_get(track, "publisher_metadata", "explicit", default=False), bool
)
title = typed(track["title"].strip(), str)
artist = typed(track["user"]["username"], str)
tracknumber = 1
info = TrackInfo(
id=track_id,
quality=album.info.quality,
bit_depth=bit_depth,
explicit=explicit,
sampling_rate=sampling_rate,
work=None,
)
return cls(
info=info,
title=title,
album=album,
artist=artist,
tracknumber=tracknumber,
discnumber=0,
composer=None,
)
@classmethod
def from_tidal(cls, album: AlbumMetadata, resp) -> TrackMetadata:
raise NotImplemented
@classmethod
def from_resp(cls, album: AlbumMetadata, source, resp) -> TrackMetadata:
if source == "qobuz":
return cls.from_qobuz(album, resp)
if source == "tidal":
return cls.from_tidal(album, resp)
if source == "soundcloud":
return cls.from_soundcloud(album, resp)
if source == "deezer":
return cls.from_deezer(album, resp)
raise Exception
def format_track_path(self, format_string: str) -> str:
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "explicit", "albumcomposer"
none_text = "Unknown"
info = {
"title": self.title,
"tracknumber": self.tracknumber,
"artist": self.artist,
"albumartist": self.album.albumartist,
"albumcomposer": self.album.albumcomposer or none_text,
"composer": self.composer or none_text,
"explicit": " (Explicit) " if self.info.explicit else "",
}
return format_string.format(**info)

View file

@ -0,0 +1,51 @@
import functools
from typing import Optional, Type, TypeVar
def get_album_track_ids(source: str, resp) -> list[str]:
tracklist = resp["tracks"]
if source == "qobuz":
tracklist = tracklist["items"]
return [track["id"] for track in tracklist]
def safe_get(dictionary, *keys, default=None):
return functools.reduce(
lambda d, key: d.get(key, default) if isinstance(d, dict) else default,
keys,
dictionary,
)
T = TypeVar("T")
def typed(thing, expected_type: Type[T]) -> T:
assert isinstance(thing, expected_type)
return thing
def get_quality_id(
bit_depth: Optional[int], sampling_rate: Optional[int | float]
) -> int:
"""Get the universal quality id from bit depth and sampling rate.
:param bit_depth:
:type bit_depth: Optional[int]
:param sampling_rate: In kHz
:type sampling_rate: Optional[int]
"""
# XXX: Should `0` quality be supported?
if bit_depth is None or sampling_rate is None: # is lossy
return 1
if bit_depth == 16:
return 2
if bit_depth == 24:
if sampling_rate <= 96:
return 3
return 4
raise Exception(f"Invalid {bit_depth = }")

View file

@ -1,13 +1,84 @@
import asyncio
import logging
import os
from dataclasses import dataclass from dataclasses import dataclass
from .artwork import download_artwork
from .client import Client
from .config import Config
from .filepath_utils import clean_filename
from .media import Media, Pending from .media import Media, Pending
from .metadata import AlbumMetadata, Covers, PlaylistMetadata, TrackMetadata
from .track import Track
logger = logging.getLogger("streamrip")
@dataclass(slots=True)
class PendingPlaylistTrack(Pending):
id: str
client: Client
config: Config
folder: str
async def resolve(self) -> Track:
resp = await self.client.get_metadata(self.id, "track")
album = AlbumMetadata.from_resp(resp["album"], self.client.source)
meta = TrackMetadata.from_resp(album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality
assert isinstance(quality, int)
embedded_cover_path, downloadable = await asyncio.gather(
self._download_cover(album.covers, self.folder),
self.client.get_downloadable(self.id, quality),
)
return Track(meta, downloadable, self.config, self.folder, embedded_cover_path)
async def _download_cover(self, covers: Covers, folder: str) -> str | None:
embed_path, _ = await download_artwork(
self.client.session,
folder,
covers,
self.config.session.artwork,
for_playlist=True,
)
return embed_path
@dataclass(slots=True) @dataclass(slots=True)
class Playlist(Media): class Playlist(Media):
name: str
config: Config
client: Client
tracks: list[PendingPlaylistTrack]
async def preprocess(self):
pass
async def download(self):
async def _resolve_and_download(pending):
track = await pending.resolve()
await track.rip()
await asyncio.gather(*[_resolve_and_download(p) for p in self.tracks])
async def postprocess(self):
pass pass
@dataclass(slots=True) @dataclass(slots=True)
class PendingPlaylist(Pending): class PendingPlaylist(Pending):
pass id: str
client: Client
config: Config
async def resolve(self):
resp = await self.client.get_metadata(self.id, "playlist")
meta = PlaylistMetadata.from_resp(resp, self.client.source)
name = meta.name
parent = self.config.session.downloads.folder
folder = os.path.join(parent, clean_filename(name))
tracks = [
PendingPlaylistTrack(id, self.client, self.config, folder)
for id in meta.ids()
]
return Playlist(name, self.config, self.client, tracks)

View file

@ -10,6 +10,7 @@ from .config import Config
from .deezer_client import DeezerClient from .deezer_client import DeezerClient
from .exceptions import AuthenticationError, MissingCredentials from .exceptions import AuthenticationError, MissingCredentials
from .qobuz_client import QobuzClient from .qobuz_client import QobuzClient
from .soundcloud_client import SoundcloudClient
from .tidal_client import TidalClient from .tidal_client import TidalClient
@ -195,15 +196,30 @@ class DeezerPrompter(CredentialPrompter):
return client return client
class SoundcloudPrompter(CredentialPrompter):
def has_creds(self) -> bool:
return True
async def prompt_and_login(self):
pass
def save(self):
pass
def type_check_client(self, client) -> SoundcloudClient:
assert isinstance(client, SoundcloudClient)
return client
PROMPTERS = { PROMPTERS = {
"qobuz": (QobuzPrompter, QobuzClient), "qobuz": QobuzPrompter,
"deezer": (DeezerPrompter, QobuzClient), "deezer": DeezerPrompter,
"tidal": (TidalPrompter, QobuzClient), "tidal": TidalPrompter,
"soundcloud": SoundcloudPrompter,
} }
def get_prompter(client: Client, config: Config) -> CredentialPrompter: def get_prompter(client: Client, config: Config) -> CredentialPrompter:
"""Return an instance of a prompter.""" """Return an instance of a prompter."""
p, c = PROMPTERS[client.source] p = PROMPTERS[client.source]
assert isinstance(client, c)
return p(config, client) return p(config, client)

View file

@ -54,6 +54,7 @@ class QobuzClient(Client):
self.secret: Optional[str] = None self.secret: Optional[str] = None
async def login(self): async def login(self):
logger.info("Logging into qobuz")
self.session = await self.get_session() self.session = await self.get_session()
c = self.config.session.qobuz c = self.config.session.qobuz
if not c.email_or_userid or not c.password_or_token: if not c.email_or_userid or not c.password_or_token:

View file

@ -1,3 +1,6 @@
import asyncio
import itertools
import logging
import re import re
from .client import Client from .client import Client
@ -8,11 +11,17 @@ from .exceptions import NonStreamable
BASE = "https://api-v2.soundcloud.com" BASE = "https://api-v2.soundcloud.com"
SOUNDCLOUD_USER_ID = "672320-86895-162383-801513" SOUNDCLOUD_USER_ID = "672320-86895-162383-801513"
logger = logging.getLogger("streamrip")
class SoundcloudClient(Client): class SoundcloudClient(Client):
source = "soundcloud" source = "soundcloud"
logged_in = False logged_in = False
NON_STREAMABLE = "_non_streamable"
ORIGINAL_DOWNLOAD = "_original_download"
NOT_RESOLVED = "_not_resolved"
def __init__(self, config: Config): def __init__(self, config: Config):
self.global_config = config self.global_config = config
self.config = config.session.soundcloud self.config = config.session.soundcloud
@ -23,41 +32,135 @@ class SoundcloudClient(Client):
async def login(self): async def login(self):
self.session = await self.get_session() self.session = await self.get_session()
client_id, app_version = self.config.client_id, self.config.app_version client_id, app_version = self.config.client_id, self.config.app_version
if not client_id or not app_version or not self._announce(): if not client_id or not app_version or not (await self._announce()):
client_id, app_version = await self._refresh_tokens() client_id, app_version = await self._refresh_tokens()
# update file and session configs and save to disk # update file and session configs and save to disk
c = self.global_config.file.soundcloud cf = self.global_config.file.soundcloud
self.config.client_id = c.client_id = client_id cs = self.global_config.session.soundcloud
self.config.client_id = c.app_version = app_version cs.client_id = client_id
cs.app_version = app_version
cf.client_id = client_id
cf.app_version = app_version
self.global_config.file.set_modified() self.global_config.file.set_modified()
logger.debug(f"Current valid {client_id=} {app_version=}")
self.logged_in = True
async def get_metadata(self, item_id: str, media_type: str) -> dict: async def get_metadata(self, item_id: str, media_type: str) -> dict:
raise NotImplementedError """Fetch metadata for an item in Soundcloud API.
async def get_downloadable(self, item: dict, _) -> SoundcloudDownloadable: Args:
if not item["streamable"] or item["policy"] == "BLOCK": item_id (str): Plain soundcloud item ID (e.g 1633786176)
raise NonStreamable(item) media_type (str): track or playlist
if item["downloadable"] and item["has_downloads_left"]:
resp = await self._api_request(f"tracks/{item['id']}/download")
resp_json = await resp.json()
return SoundcloudDownloadable(
self.session, {"url": resp_json["redirectUri"], "type": "original"}
)
Returns:
API response.
"""
if media_type == "track":
return await self._get_track(item_id)
elif media_type == "playlist":
return await self._get_playlist(item_id)
else: else:
raise Exception(f"{media_type} not supported")
async def _get_track(self, item_id: str):
resp, status = await self._api_request(f"tracks/{item_id}")
assert status == 200
return resp
async def _get_playlist(self, item_id: str):
original_resp, status = await self._api_request(f"playlists/{item_id}")
assert status == 200
unresolved_tracks = [
track["id"] for track in original_resp["tracks"] if "media" not in track
]
if len(unresolved_tracks) == 0:
return original_resp
MAX_BATCH_SIZE = 50
batches = batched(unresolved_tracks, MAX_BATCH_SIZE)
requests = [
self._api_request(
"tracks",
params={"ids": ",".join(str(id) for id in filter_none(batch))},
)
for batch in batches
]
# (list of track metadata, status code)
responses: list[tuple[list, int]] = await asyncio.gather(*requests)
assert all(status == 200 for _, status in responses)
remaining_tracks = list(itertools.chain(*[resp for resp, _ in responses]))
# Insert the new metadata into the original response
track_map: dict[str, dict] = {track["id"]: track for track in remaining_tracks}
for i, track in enumerate(original_resp["tracks"]):
if "media" in track: # track already has metadata
continue
this_track = track_map.get(track["id"])
if this_track is None:
raise Exception(f"Requested {track['id']} but got no response")
original_resp["tracks"][i] = this_track
# Overwrite all ids in playlist
for track in original_resp["tracks"]:
track["id"] = self._get_custom_id(track)
return original_resp
@classmethod
def _get_custom_id(cls, resp: dict) -> str:
item_id = resp["id"]
assert "media" in resp, f"track {resp} should be resolved"
if not resp["streamable"] or resp["policy"] == "BLOCK":
return f"{item_id}|{cls.NON_STREAMABLE}"
if resp["downloadable"] and resp["has_downloads_left"]:
return f"{item_id}|{cls.ORIGINAL_DOWNLOAD}"
url = None url = None
for tc in item["media"]["transcodings"]: for tc in resp["media"]["transcodings"]:
fmt = tc["format"] fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg": if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
url = tc["url"] url = tc["url"]
break break
assert url is not None assert url is not None
return f"{item_id}|{url}"
resp = await self._request(url) async def get_downloadable(self, item_info: str, _) -> SoundcloudDownloadable:
resp_json = await resp.json() # We have `get_metadata` overwrite the "id" field so that it contains
# some extra information we need to download soundcloud tracks
# item_id is the soundcloud ID of the track
# download_url is either the url that points to an mp3 download or ""
# if download_url == '_non_streamable' then we raise an exception
infos: list[str] = item_info.split("|")
assert len(infos) == 2, infos
item_id, download_info = infos
if download_info == self.NON_STREAMABLE:
raise NonStreamable(item_info)
if download_info == self.ORIGINAL_DOWNLOAD:
resp_json, status = await self._api_request(f"tracks/{item_id}/download")
assert status == 200
return SoundcloudDownloadable(
self.session, {"url": resp_json["redirectUri"], "type": "original"}
)
if download_info == self.NOT_RESOLVED:
raise NotImplementedError(item_info)
# download_info contains mp3 stream url
resp_json, status = await self._request(download_info)
return SoundcloudDownloadable( return SoundcloudDownloadable(
self.session, {"url": resp_json["url"], "type": "mp3"} self.session, {"url": resp_json["url"], "type": "mp3"}
) )
@ -73,14 +176,29 @@ class SoundcloudClient(Client):
"offset": offset, "offset": offset,
"linked_partitioning": "1", "linked_partitioning": "1",
} }
resp = await self._api_request(f"search/{media_type}s", params=params) resp, status = await self._api_request(f"search/{media_type}s", params=params)
return await resp.json() assert status == 200
return resp
async def _api_request(self, path, params=None, headers=None): async def _api_request(self, path, params=None, headers=None):
url = f"{BASE}/{path}" url = f"{BASE}/{path}"
return await self._request(url, params=params, headers=headers) return await self._request(url, params=params, headers=headers)
async def _request(self, url, params=None, headers=None): async def _request(self, url, params=None, headers=None) -> tuple[dict, int]:
c = self.config
_params = {
"client_id": c.client_id,
"app_version": c.app_version,
"app_locale": "en",
}
if params is not None:
_params.update(params)
logger.debug(f"Requesting {url} with {_params=}, {headers=}")
async with self.session.get(url, params=_params, headers=headers) as resp:
return await resp.json(), resp.status
async def _request_body(self, url, params=None, headers=None):
c = self.config c = self.config
_params = { _params = {
"client_id": c.client_id, "client_id": c.client_id,
@ -91,15 +209,17 @@ class SoundcloudClient(Client):
_params.update(params) _params.update(params)
async with self.session.get(url, params=_params, headers=headers) as resp: async with self.session.get(url, params=_params, headers=headers) as resp:
return resp return await resp.content.read(), resp.status
async def _resolve_url(self, url: str) -> dict: async def _resolve_url(self, url: str) -> dict:
resp = await self._api_request(f"resolve?url={url}") resp, status = await self._api_request("resolve", params={"url": url})
return await resp.json() assert status == 200
return resp
async def _announce(self): async def _announce(self):
resp = await self._api_request("announcements") url = f"{BASE}/announcements"
return resp.status == 200 _, status = await self._request_body(url)
return status == 200
async def _refresh_tokens(self) -> tuple[str, str]: async def _refresh_tokens(self) -> tuple[str, str]:
"""Return a valid client_id, app_version pair.""" """Return a valid client_id, app_version pair."""
@ -130,4 +250,14 @@ class SoundcloudClient(Client):
assert client_id_match is not None assert client_id_match is not None
client_id = client_id_match.group(1) client_id = client_id_match.group(1)
logger.debug(f"Refreshed soundcloud tokens as {client_id=} {app_version=}")
return client_id, app_version return client_id, app_version
def batched(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return list(itertools.zip_longest(*args, fillvalue=fillvalue))
def filter_none(iterable):
return (x for x in iterable if x is not None)

View file

@ -115,7 +115,7 @@ class Container(Enum):
# unreachable # unreachable
return [] return []
def _tag_flac(self, meta) -> list[tuple]: def _tag_flac(self, meta: TrackMetadata) -> list[tuple]:
out = [] out = []
for k, v in FLAC_KEY.items(): for k, v in FLAC_KEY.items():
tag = self._attr_from_meta(meta, k) tag = self._attr_from_meta(meta, k)
@ -131,13 +131,13 @@ class Container(Enum):
out.append((v, str(tag))) out.append((v, str(tag)))
return out return out
def _tag_mp3(self, meta): def _tag_mp3(self, meta: TrackMetadata):
out = [] out = []
for k, v in MP3_KEY.items(): for k, v in MP3_KEY.items():
if k == "tracknumber": if k == "tracknumber":
text = f"{meta.tracknumber}/{meta.tracktotal}" text = f"{meta.tracknumber}/{meta.album.tracktotal}"
elif k == "discnumber": elif k == "discnumber":
text = f"{meta.discnumber}/{meta.disctotal}" text = f"{meta.discnumber}/{meta.album.disctotal}"
else: else:
text = self._attr_from_meta(meta, k) text = self._attr_from_meta(meta, k)
@ -145,13 +145,13 @@ class Container(Enum):
out.append((v.__name__, v(encoding=3, text=text))) out.append((v.__name__, v(encoding=3, text=text)))
return out return out
def _tag_aac(self, meta): def _tag_aac(self, meta: TrackMetadata):
out = [] out = []
for k, v in MP4_KEY.items(): for k, v in MP4_KEY.items():
if k == "tracknumber": if k == "tracknumber":
text = [(meta.tracknumber, meta.tracktotal)] text = [(meta.tracknumber, meta.album.tracktotal)]
elif k == "discnumber": elif k == "discnumber":
text = [(meta.discnumber, meta.disctotal)] text = [(meta.discnumber, meta.album.disctotal)]
else: else:
text = self._attr_from_meta(meta, k) text = self._attr_from_meta(meta, k)

View file

@ -102,7 +102,7 @@ class PendingTrack(Pending):
meta = TrackMetadata.from_resp(self.album, self.client.source, resp) meta = TrackMetadata.from_resp(self.album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality quality = getattr(self.config.session, self.client.source).quality
assert isinstance(quality, int) assert isinstance(quality, int)
downloadable = await self.client.get_downloadable({"id": self.id}, quality) downloadable = await self.client.get_downloadable(self.id, quality)
return Track(meta, downloadable, self.config, self.folder, self.cover_path) return Track(meta, downloadable, self.config, self.folder, self.cover_path)
@ -120,7 +120,9 @@ class PendingSingle(Pending):
async def resolve(self) -> Track: async def resolve(self) -> Track:
resp = await self.client.get_metadata(self.id, "track") resp = await self.client.get_metadata(self.id, "track")
album = AlbumMetadata.from_resp(resp["album"], self.client.source) # Patch for soundcloud
# self.id = resp["id"]
album = AlbumMetadata.from_resp(resp, self.client.source)
meta = TrackMetadata.from_resp(album, self.client.source, resp) meta = TrackMetadata.from_resp(album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality quality = getattr(self.config.session, self.client.source).quality
@ -132,7 +134,7 @@ class PendingSingle(Pending):
embedded_cover_path, downloadable = await asyncio.gather( embedded_cover_path, downloadable = await asyncio.gather(
self._download_cover(album.covers, folder), self._download_cover(album.covers, folder),
self.client.get_downloadable({"id": self.id}, quality), self.client.get_downloadable(self.id, quality),
) )
return Track(meta, downloadable, self.config, folder, embedded_cover_path) return Track(meta, downloadable, self.config, folder, embedded_cover_path)
@ -144,6 +146,10 @@ class PendingSingle(Pending):
async def _download_cover(self, covers: Covers, folder: str) -> str | None: async def _download_cover(self, covers: Covers, folder: str) -> str | None:
embed_path, _ = await download_artwork( embed_path, _ = await download_artwork(
self.client.session, folder, covers, self.config.session.artwork self.client.session,
folder,
covers,
self.config.session.artwork,
for_playlist=False,
) )
return embed_path return embed_path

View file

@ -3,12 +3,12 @@ from __future__ import annotations
import re import re
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from click import secho
from .album import PendingAlbum from .album import PendingAlbum
from .client import Client from .client import Client
from .config import Config from .config import Config
from .media import Pending from .media import Pending
from .playlist import PendingPlaylist
from .soundcloud_client import SoundcloudClient
from .track import PendingSingle from .track import PendingSingle
from .validation_regexps import ( from .validation_regexps import (
DEEZER_DYNAMIC_LINK_REGEX, DEEZER_DYNAMIC_LINK_REGEX,
@ -100,8 +100,29 @@ class DeezerDynamicURL(URL):
pass pass
class SoundCloudURL(URL): class SoundcloudURL(URL):
pass source = "soundcloud"
def __init__(self, url: str):
self.url = url
async def into_pending(self, client: SoundcloudClient, config: Config) -> Pending:
resolved = await client._resolve_url(self.url)
media_type = resolved["kind"]
item_id = str(resolved["id"])
if media_type == "track":
return PendingSingle(item_id, client, config)
elif media_type == "playlist":
return PendingPlaylist(item_id, client, config)
else:
raise NotImplementedError(media_type)
@classmethod
def from_str(cls, url: str):
soundcloud_url = SOUNDCLOUD_URL_REGEX.match(url)
if soundcloud_url is None:
return None
return cls(soundcloud_url.group(0))
class LastFmURL(URL): class LastFmURL(URL):
@ -109,10 +130,18 @@ class LastFmURL(URL):
def parse_url(url: str) -> URL | None: def parse_url(url: str) -> URL | None:
"""Return a URL type given a url string.
Args:
url (str): Url to parse
Returns: A URL type, or None if nothing matched.
"""
url = url.strip() url = url.strip()
parsed_urls: list[URL | None] = [ parsed_urls: list[URL | None] = [
GenericURL.from_str(url), GenericURL.from_str(url),
QobuzInterpreterURL.from_str(url), QobuzInterpreterURL.from_str(url),
SoundcloudURL.from_str(url),
# TODO: the rest of the url types # TODO: the rest of the url types
] ]
return next((u for u in parsed_urls if u is not None), None) return next((u for u in parsed_urls if u is not None), None)
@ -121,7 +150,8 @@ def parse_url(url: str) -> URL | None:
# TODO: recycle this class # TODO: recycle this class
class UniversalURL: class UniversalURL:
""" """
>>> u = UniversalURL('https://sampleurl.com') >>> u = UniversalURL.from_str('https://sampleurl.com')
>>> if u is not None:
>>> pending = await u.into_pending_item() >>> pending = await u.into_pending_item()
""" """