More restructuring

This commit is contained in:
Nathan Thomas 2023-11-24 20:38:44 -08:00
parent 7a35d31c4b
commit ba05436fec
13 changed files with 158 additions and 164 deletions

View file

@ -1,9 +1,9 @@
from .client import Client from .client import Client
from .deezer_client import DeezerClient from .deezer import DeezerClient
from .downloadable import BasicDownloadable, Downloadable from .downloadable import BasicDownloadable, Downloadable
from .qobuz_client import QobuzClient from .qobuz import QobuzClient
from .soundcloud_client import SoundcloudClient from .soundcloud import SoundcloudClient
from .tidal_client import TidalClient from .tidal import TidalClient
__all__ = [ __all__ = [
"Client", "Client",

View file

@ -1,9 +1,13 @@
import asyncio import asyncio
import base64
import hashlib import hashlib
import logging import logging
import re import re
import time import time
from typing import AsyncGenerator, Optional from collections import OrderedDict
from typing import AsyncGenerator, List, Optional
import aiohttp
from ..config import Config from ..config import Config
from ..exceptions import ( from ..exceptions import (
@ -16,7 +20,6 @@ from ..exceptions import (
) )
from .client import Client from .client import Client
from .downloadable import BasicDownloadable, Downloadable from .downloadable import BasicDownloadable, Downloadable
from .qobuz_spoofer import QobuzSpoofer
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")
@ -41,6 +44,95 @@ QOBUZ_FEATURED_KEYS = {
} }
class QobuzSpoofer:
"""Spoofs the information required to stream tracks from Qobuz."""
def __init__(self):
"""Create a Spoofer."""
self.seed_timezone_regex = (
r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.ut'
r"imezone\.(?P<timezone>[a-z]+)\)"
)
# note: {timezones} should be replaced with every capitalized timezone joined by a |
self.info_extras_regex = (
r'name:"\w+/(?P<timezone>{timezones})",info:"'
r'(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"'
)
self.app_id_regex = (
r'production:{api:{appId:"(?P<app_id>\d{9})",appSecret:"(\w{32})'
)
self.session = None
async def get_app_id_and_secrets(self) -> tuple[str, list[str]]:
assert self.session is not None
async with self.session.get("https://play.qobuz.com/login") as req:
login_page = await req.text()
bundle_url_match = re.search(
r'<script src="(/resources/\d+\.\d+\.\d+-[a-z]\d{3}/bundle\.js)"></script>',
login_page,
)
assert bundle_url_match is not None
bundle_url = bundle_url_match.group(1)
async with self.session.get("https://play.qobuz.com" + bundle_url) as req:
self.bundle = await req.text()
match = re.search(self.app_id_regex, self.bundle)
if match is None:
raise Exception("Could not find app id.")
app_id = str(match.group("app_id"))
# get secrets
seed_matches = re.finditer(self.seed_timezone_regex, self.bundle)
secrets = OrderedDict()
for match in seed_matches:
seed, timezone = match.group("seed", "timezone")
secrets[timezone] = [seed]
"""
The code that follows switches around the first and second timezone.
Qobuz uses two ternary (a shortened if statement) conditions that
should always return false. The way Javascript's ternary syntax
works, the second option listed is what runs if the condition returns
false. Because of this, we must prioritize the *second* seed/timezone
pair captured, not the first.
"""
keypairs = list(secrets.items())
secrets.move_to_end(keypairs[1][0], last=False)
info_extras_regex = self.info_extras_regex.format(
timezones="|".join(timezone.capitalize() for timezone in secrets)
)
info_extras_matches = re.finditer(info_extras_regex, self.bundle)
for match in info_extras_matches:
timezone, info, extras = match.group("timezone", "info", "extras")
secrets[timezone.lower()] += [info, extras]
for secret_pair in secrets:
secrets[secret_pair] = base64.standard_b64decode(
"".join(secrets[secret_pair])[:-44]
).decode("utf-8")
vals: List[str] = list(secrets.values())
vals.remove("")
secrets_list = vals
return app_id, secrets_list
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *_):
if self.session is not None:
await self.session.close()
self.session = None
class QobuzClient(Client): class QobuzClient(Client):
source = "qobuz" source = "qobuz"
max_quality = 4 max_quality = 4

View file

@ -1,100 +0,0 @@
"""Get app id and secrets for Qobuz.
Credits to Dash for this tool.
"""
import base64
import re
from collections import OrderedDict
from typing import List
import aiohttp
class QobuzSpoofer:
"""Spoofs the information required to stream tracks from Qobuz."""
def __init__(self):
"""Create a Spoofer."""
self.seed_timezone_regex = (
r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.ut'
r"imezone\.(?P<timezone>[a-z]+)\)"
)
# note: {timezones} should be replaced with every capitalized timezone joined by a |
self.info_extras_regex = (
r'name:"\w+/(?P<timezone>{timezones})",info:"'
r'(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"'
)
self.app_id_regex = (
r'production:{api:{appId:"(?P<app_id>\d{9})",appSecret:"(\w{32})'
)
self.session = None
async def get_app_id_and_secrets(self) -> tuple[str, list[str]]:
assert self.session is not None
async with self.session.get("https://play.qobuz.com/login") as req:
login_page = await req.text()
bundle_url_match = re.search(
r'<script src="(/resources/\d+\.\d+\.\d+-[a-z]\d{3}/bundle\.js)"></script>',
login_page,
)
assert bundle_url_match is not None
bundle_url = bundle_url_match.group(1)
async with self.session.get("https://play.qobuz.com" + bundle_url) as req:
self.bundle = await req.text()
match = re.search(self.app_id_regex, self.bundle)
if match is None:
raise Exception("Could not find app id.")
app_id = str(match.group("app_id"))
# get secrets
seed_matches = re.finditer(self.seed_timezone_regex, self.bundle)
secrets = OrderedDict()
for match in seed_matches:
seed, timezone = match.group("seed", "timezone")
secrets[timezone] = [seed]
"""
The code that follows switches around the first and second timezone.
Qobuz uses two ternary (a shortened if statement) conditions that
should always return false. The way Javascript's ternary syntax
works, the second option listed is what runs if the condition returns
false. Because of this, we must prioritize the *second* seed/timezone
pair captured, not the first.
"""
keypairs = list(secrets.items())
secrets.move_to_end(keypairs[1][0], last=False)
info_extras_regex = self.info_extras_regex.format(
timezones="|".join(timezone.capitalize() for timezone in secrets)
)
info_extras_matches = re.finditer(info_extras_regex, self.bundle)
for match in info_extras_matches:
timezone, info, extras = match.group("timezone", "info", "extras")
secrets[timezone.lower()] += [info, extras]
for secret_pair in secrets:
secrets[secret_pair] = base64.standard_b64decode(
"".join(secrets[secret_pair])[:-44]
).decode("utf-8")
vals: List[str] = list(secrets.values())
vals.remove("")
secrets_list = vals
return app_id, secrets_list
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *_):
if self.session is not None:
await self.session.close()
self.session = None

View file

@ -1,22 +1,19 @@
"""A config class that manages arguments between the config file and CLI.""" """A config class that manages arguments between the config file and CLI."""
import copy import copy
import logging import logging
import os
import shutil
from dataclasses import dataclass, fields from dataclasses import dataclass, fields
from pathlib import Path
import click
from tomlkit.api import dumps, parse from tomlkit.api import dumps, parse
from tomlkit.toml_document import TOMLDocument from tomlkit.toml_document import TOMLDocument
from .user_paths import (
DEFAULT_CONFIG_PATH,
DEFAULT_DOWNLOADS_DB_PATH,
DEFAULT_DOWNLOADS_FOLDER,
DEFAULT_FAILED_DOWNLOADS_DB_PATH,
DEFAULT_YOUTUBE_VIDEO_DOWNLOADS_FOLDER,
)
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")
APP_DIR = click.get_app_dir("streamrip", force_posix=True)
DEFAULT_CONFIG_PATH = os.path.join(APP_DIR, "config.toml")
CURRENT_CONFIG_VERSION = "2.0" CURRENT_CONFIG_VERSION = "2.0"
@ -216,6 +213,17 @@ class MiscConfig:
version: str version: str
HOME = Path.home()
DEFAULT_DOWNLOADS_FOLDER = os.path.join(HOME, "StreamripDownloads")
DEFAULT_DOWNLOADS_DB_PATH = os.path.join(APP_DIR, "downloads.db")
DEFAULT_FAILED_DOWNLOADS_DB_PATH = os.path.join(APP_DIR, "failed_downloads.db")
DEFAULT_YOUTUBE_VIDEO_DOWNLOADS_FOLDER = os.path.join(
DEFAULT_DOWNLOADS_FOLDER, "YouTubeVideos"
)
BLANK_CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.toml")
assert os.path.isfile(BLANK_CONFIG_PATH), "Template config not found"
@dataclass(slots=True) @dataclass(slots=True)
class ConfigData: class ConfigData:
toml: TOMLDocument toml: TOMLDocument
@ -287,7 +295,7 @@ class ConfigData:
@classmethod @classmethod
def defaults(cls): def defaults(cls):
with open(DEFAULT_CONFIG_PATH) as f: with open(BLANK_CONFIG_PATH) as f:
return cls.from_toml(f.read()) return cls.from_toml(f.read())
def set_modified(self): def set_modified(self):
@ -352,7 +360,7 @@ class Config:
@classmethod @classmethod
def defaults(cls): def defaults(cls):
return cls(DEFAULT_CONFIG_PATH) return cls(BLANK_CONFIG_PATH)
def __enter__(self): def __enter__(self):
return self return self
@ -362,10 +370,9 @@ class Config:
def set_user_defaults(path: str, /): def set_user_defaults(path: str, /):
"""Update the TOML file at the path with user-specific default values. """Update the TOML file at the path with user-specific default values."""
shutil.copy(BLANK_CONFIG_PATH, path)
MUST copy updated blank config to `path` before calling this!
"""
with open(path) as f: with open(path) as f:
toml = parse(f.read()) toml = parse(f.read())
toml["downloads"]["folder"] = DEFAULT_DOWNLOADS_FOLDER # type: ignore toml["downloads"]["folder"] = DEFAULT_DOWNLOADS_FOLDER # type: ignore

View file

@ -1,21 +1,25 @@
from .album import Album, PendingAlbum from .album import Album, PendingAlbum
from .artist import Artist, PendingArtist from .artist import Artist, PendingArtist
from .artwork import remove_artwork_tempdirs
from .label import Label, PendingLabel from .label import Label, PendingLabel
from .media import Media from .media import Media, Pending
from .playlist import PendingPlaylist, PendingPlaylistTrack, Playlist from .playlist import PendingPlaylist, PendingPlaylistTrack, Playlist
from .track import PendingTrack, Track from .track import PendingSingle, PendingTrack, Track
__all__ = [ __all__ = [
"Album",
"Artist",
"Label",
"Media", "Media",
"Pending",
"Album",
"PendingAlbum", "PendingAlbum",
"Artist",
"PendingArtist", "PendingArtist",
"Label",
"PendingLabel", "PendingLabel",
"PendingPlaylist",
"PendingPlaylistTrack",
"PendingTrack",
"Playlist", "Playlist",
"PendingPlaylist",
"Track", "Track",
"PendingTrack",
"PendingPlaylistTrack",
"PendingSingle",
"remove_artwork_tempdirs",
] ]

View file

@ -9,7 +9,7 @@ from mutagen.id3 import APIC # type: ignore
from mutagen.id3 import ID3 from mutagen.id3 import ID3
from mutagen.mp4 import MP4, MP4Cover from mutagen.mp4 import MP4, MP4Cover
from . import TrackMetadata from .track_metadata import TrackMetadata
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")

View file

@ -11,10 +11,10 @@ from rich.logging import RichHandler
from rich.prompt import Confirm from rich.prompt import Confirm
from rich.traceback import install from rich.traceback import install
from .config import Config, set_user_defaults from ..config import DEFAULT_CONFIG_PATH, Config, set_user_defaults
from .console import console from ..console import console
from .main import Main from .main import Main
from .user_paths import BLANK_CONFIG_PATH, DEFAULT_CONFIG_PATH from .user_paths import DEFAULT_CONFIG_PATH
def coro(f): def coro(f):
@ -81,7 +81,6 @@ def rip(ctx, config_path, folder, no_db, quality, convert, no_progress, verbose)
console.print( console.print(
f"No file found at [bold cyan]{config_path}[/bold cyan], creating default config." f"No file found at [bold cyan]{config_path}[/bold cyan], creating default config."
) )
shutil.copy(BLANK_CONFIG_PATH, config_path)
set_user_defaults(config_path) set_user_defaults(config_path)
# pass to subcommands # pass to subcommands
@ -177,7 +176,6 @@ def config_reset(ctx, yes):
console.print("[green]Reset aborted") console.print("[green]Reset aborted")
return return
shutil.copy(BLANK_CONFIG_PATH, config_path)
set_user_defaults(config_path) set_user_defaults(config_path)
console.print(f"Reset the config file at [bold cyan]{config_path}!") console.print(f"Reset the config file at [bold cyan]{config_path}!")
@ -197,6 +195,7 @@ async def search(query, source):
@rip.command() @rip.command()
@click.argument("url", required=True) @click.argument("url", required=True)
def lastfm(url): def lastfm(url):
"""Download tracks from a last.fm playlist using a supported source."""
raise NotImplementedError raise NotImplementedError

View file

@ -1,17 +1,14 @@
import asyncio import asyncio
import logging import logging
from . import db from .. import db
from .artwork import remove_artwork_tempdirs from ..client import Client, QobuzClient, SoundcloudClient
from .client import Client from ..config import Config
from .config import Config from ..console import console
from .console import console from ..media import Media, Pending, remove_artwork_tempdirs
from .media import Media, Pending from ..progress import clear_progress
from .progress import clear_progress from .parse_url import parse_url
from .prompter import get_prompter from .prompter import get_prompter
from .qobuz_client import QobuzClient
from .soundcloud_client import SoundcloudClient
from .universal_url import parse_url
logger = logging.getLogger("streamrip") logger = logging.getLogger("streamrip")

View file

@ -3,27 +3,26 @@ from __future__ import annotations
import re import re
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from .album import PendingAlbum from ..client import Client, SoundcloudClient
from .artist import PendingArtist from ..config import Config
from .client import Client from ..db import Database
from .config import Config from ..media import (
from .db import Database Pending,
from .label import PendingLabel PendingAlbum,
from .media import Pending PendingArtist,
from .playlist import PendingPlaylist PendingLabel,
from .soundcloud_client import SoundcloudClient PendingPlaylist,
from .track import PendingSingle PendingSingle,
)
from .validation_regexps import ( from .validation_regexps import (
DEEZER_DYNAMIC_LINK_REGEX,
LASTFM_URL_REGEX,
QOBUZ_INTERPRETER_URL_REGEX, QOBUZ_INTERPRETER_URL_REGEX,
SOUNDCLOUD_URL_REGEX, SOUNDCLOUD_URL_REGEX,
URL_REGEX, URL_REGEX,
YOUTUBE_URL_REGEX,
) )
class URL(ABC): class URL(ABC):
match: re.Match
source: str source: str
def __init__(self, match: re.Match, source: str): def __init__(self, match: re.Match, source: str):

View file

@ -5,13 +5,9 @@ from getpass import getpass
from click import launch, secho, style from click import launch, secho, style
from .client import Client from ..client import Client, DeezerClient, QobuzClient, SoundcloudClient, TidalClient
from .config import Config from ..config import Config
from .deezer_client import DeezerClient from ..exceptions import AuthenticationError, MissingCredentials
from .exceptions import AuthenticationError, MissingCredentials
from .qobuz_client import QobuzClient
from .soundcloud_client import SoundcloudClient
from .tidal_client import TidalClient
class CredentialPrompter(ABC): class CredentialPrompter(ABC):