Finish most of skeleton

This commit is contained in:
Nathan Thomas 2023-09-21 19:19:30 -07:00
parent b5a442c042
commit 34277a3c67
26 changed files with 2357 additions and 1791 deletions

View file

@ -1,845 +0,0 @@
import concurrent.futures
import logging
import os
import threading
from typing import Optional
import requests
from cleo.application import Application as BaseApplication
from cleo.commands.command import Command
from cleo.formatters.style import Style
from cleo.helpers import argument, option
from click import launch
from streamrip import __version__
from .config import Config
from .core import RipCore
logging.basicConfig(level="WARNING")
logger = logging.getLogger("streamrip")
outdated = False
newest_version: Optional[str] = None
class DownloadCommand(Command):
name = "url"
description = "Download items using urls."
arguments = [
argument(
"urls",
"One or more Qobuz, Tidal, Deezer, or SoundCloud urls",
optional=True,
multiple=True,
)
]
options = [
option(
"file",
"-f",
"Path to a text file containing urls",
flag=False,
default="None",
),
option(
"codec",
"-c",
"Convert the downloaded files to <cmd>ALAC</cmd>, <cmd>FLAC</cmd>, <cmd>MP3</cmd>, <cmd>AAC</cmd>, or <cmd>OGG</cmd>",
flag=False,
default="None",
),
option(
"max-quality",
"m",
"The maximum quality to download. Can be <cmd>0</cmd>, <cmd>1</cmd>, <cmd>2</cmd>, <cmd>3 </cmd>or <cmd>4</cmd>",
flag=False,
default="None",
),
option(
"ignore-db",
"-i",
description="Download items even if they have been logged in the database.",
),
option("config", description="Path to config file.", flag=False),
option("directory", "-d", "Directory to download items into.", flag=False),
]
help = (
"\nDownload <title>Dreams</title> by <title>Fleetwood Mac</title>:\n"
"$ <cmd>rip url https://www.deezer.com/us/track/67549262</cmd>\n\n"
"Batch download urls from a text file named <path>urls.txt</path>:\n"
"$ <cmd>rip url --file urls.txt</cmd>\n\n"
"For more information on Quality IDs, see\n"
"<url>https://github.com/nathom/streamrip/wiki/Quality-IDs</url>\n"
)
def handle(self):
global outdated
global newest_version
# Use a thread so that it doesn't slow down startup
update_check = threading.Thread(target=is_outdated, daemon=True)
update_check.start()
path, quality, no_db, directory, config = clean_options(
self.option("file"),
self.option("max-quality"),
self.option("ignore-db"),
self.option("directory"),
self.option("config"),
)
assert isinstance(config, str) or config is None
config = Config(config)
if directory is not None:
config.session["downloads"]["folder"] = directory
if no_db:
config.session["database"]["enabled"] = False
if quality is not None:
for source in ("qobuz", "tidal", "deezer"):
config.session[source]["quality"] = quality
core = RipCore(config)
urls = self.argument("urls")
if path is not None:
assert isinstance(path, str)
if os.path.isfile(path):
core.handle_txt(path)
else:
self.line(
f"<error>File <comment>{path}</comment> does not exist.</error>"
)
return 1
if urls:
core.handle_urls(";".join(urls))
if len(core) > 0:
core.download()
elif not urls and path is None:
self.line("<error>Must pass arguments. See </><cmd>rip url -h</cmd>.")
update_check.join()
if outdated:
import re
self.line(
f"\n<info>A new version of streamrip <title>v{newest_version}</title>"
" is available! Run <cmd>pip3 install streamrip --upgrade</cmd>"
" to update.</info>\n"
)
md_header = re.compile(r"#\s+(.+)")
bullet_point = re.compile(r"-\s+(.+)")
code = re.compile(r"`([^`]+)`")
issue_reference = re.compile(r"(#\d+)")
release_notes = requests.get(
"https://api.github.com/repos/nathom/streamrip/releases/latest"
).json()["body"]
release_notes = md_header.sub(r"<header>\1</header>", release_notes)
release_notes = bullet_point.sub(r"<options=bold>•</> \1", release_notes)
release_notes = code.sub(r"<cmd>\1</cmd>", release_notes)
release_notes = issue_reference.sub(r"<options=bold>\1</>", release_notes)
self.line(release_notes)
return 0
class SearchCommand(Command):
name = "search"
description = "Search for an item"
arguments = [
argument(
"query",
"The name to search for",
optional=False,
multiple=False,
)
]
options = [
option(
"source",
"-s",
"Qobuz, Tidal, Soundcloud, Deezer, or Deezloader",
flag=False,
default="qobuz",
),
option(
"type",
"-t",
"Album, Playlist, Track, or Artist",
flag=False,
default="album",
),
]
help = (
"\nSearch for <title>Rumours</title> by <title>Fleetwood Mac</title>\n"
"$ <cmd>rip search 'rumours fleetwood mac'</cmd>\n\n"
"Search for <title>444</title> by <title>Jay-Z</title> on TIDAL\n"
"$ <cmd>rip search --source tidal '444'</cmd>\n\n"
"Search for <title>Bob Dylan</title> on Deezer\n"
"$ <cmd>rip search --type artist --source deezer 'bob dylan'</cmd>\n"
)
def handle(self):
query = self.argument("query")
source, type = clean_options(self.option("source"), self.option("type"))
assert isinstance(source, str)
assert isinstance(type, str)
config = Config()
core = RipCore(config)
if core.interactive_search(query, source, type):
core.download()
else:
self.line("<error>No items chosen, exiting.</error>")
class DiscoverCommand(Command):
name = "discover"
description = "Download items from the charts or a curated playlist"
arguments = [
argument(
"list",
"The list to fetch",
optional=True,
multiple=False,
default="ideal-discography",
)
]
options = [
option(
"scrape",
description="Download all of the items in the list",
),
option(
"max-items",
"-m",
description="The number of items to fetch",
flag=False,
default=50,
),
option(
"source",
"-s",
description="The source to download from (<cmd>qobuz</cmd> or <cmd>deezer</cmd>)",
flag=False,
default="qobuz",
),
]
help = (
"\nBrowse the Qobuz ideal-discography list\n"
"$ <cmd>rip discover</cmd>\n\n"
"Browse the best-sellers list\n"
"$ <cmd>rip discover best-sellers</cmd>\n\n"
"Available options for Qobuz <cmd>list</cmd>:\n\n"
" • most-streamed\n"
" • recent-releases\n"
" • best-sellers\n"
" • press-awards\n"
" • ideal-discography\n"
" • editor-picks\n"
" • most-featured\n"
" • qobuzissims\n"
" • new-releases\n"
" • new-releases-full\n"
" • harmonia-mundi\n"
" • universal-classic\n"
" • universal-jazz\n"
" • universal-jeunesse\n"
" • universal-chanson\n\n"
"Browse the Deezer editorial releases list\n"
"$ <cmd>rip discover --source deezer</cmd>\n\n"
"Browse the Deezer charts\n"
"$ <cmd>rip discover --source deezer charts</cmd>\n\n"
"Available options for Deezer <cmd>list</cmd>:\n\n"
" • releases\n"
" • charts\n"
" • selection\n"
)
def handle(self):
source = self.option("source")
scrape = self.option("scrape")
chosen_list = self.argument("list")
max_items = self.option("max-items")
if source == "qobuz":
from streamrip.constants import QOBUZ_FEATURED_KEYS
if chosen_list not in QOBUZ_FEATURED_KEYS:
self.line(f'<error>Error: list "{chosen_list}" not available</error>')
self.line(self.help)
return 1
elif source == "deezer":
from streamrip.constants import DEEZER_FEATURED_KEYS
if chosen_list not in DEEZER_FEATURED_KEYS:
self.line(f'<error>Error: list "{chosen_list}" not available</error>')
self.line(self.help)
return 1
else:
self.line(
"<error>Invalid source. Choose either <cmd>qobuz</cmd> or <cmd>deezer</cmd></error>"
)
return 1
config = Config()
core = RipCore(config)
if scrape:
core.scrape(chosen_list, max_items)
core.download()
return 0
if core.interactive_search(
chosen_list, source, "featured", limit=int(max_items)
):
core.download()
else:
self.line("<error>No items chosen, exiting.</error>")
return 0
class LastfmCommand(Command):
name = "lastfm"
description = "Search for tracks from a last.fm playlist and download them."
arguments = [
argument(
"urls",
"Last.fm playlist urls",
optional=False,
multiple=True,
)
]
options = [
option(
"source",
"-s",
description="The source to search for items on",
flag=False,
default="qobuz",
),
]
help = (
"You can use this command to download Spotify, Apple Music, and YouTube "
"playlists.\nTo get started, create an account at "
"<url>https://www.last.fm</url>. Once you have\nreached the home page, "
"go to <path>Profile Icon</path> => <path>View profile</path> => "
"<path>Playlists</path> => <path>IMPORT</path>\nand paste your url.\n\n"
"Download the <info>young & free</info> Apple Music playlist (already imported)\n"
"$ <cmd>rip lastfm https://www.last.fm/user/nathan3895/playlists/12089888</cmd>\n"
)
def handle(self):
source = self.option("source")
urls = self.argument("urls")
config = Config()
core = RipCore(config)
config.session["lastfm"]["source"] = source
core.handle_lastfm_urls(";".join(urls))
core.download()
class ConfigCommand(Command):
name = "config"
description = "Manage the configuration file."
options = [
option(
"open",
"-o",
description="Open the config file in the default application",
flag=True,
),
option(
"open-vim",
"-O",
description="Open the config file in (neo)vim",
flag=True,
),
option(
"directory",
"-d",
description="Open the directory that the config file is located in",
flag=True,
),
option("path", "-p", description="Show the config file's path", flag=True),
option("qobuz", description="Set the credentials for Qobuz", flag=True),
option("tidal", description="Log into Tidal", flag=True),
option("deezer", description="Set the Deezer ARL", flag=True),
option(
"music-app",
description="Configure the config file for usage with the macOS Music App",
flag=True,
),
option("reset", description="Reset the config file", flag=True),
option(
"--update",
description="Reset the config file, keeping the credentials",
flag=True,
),
]
"""
Manage the configuration file.
config
{--o|open : Open the config file in the default application}
{--O|open-vim : Open the config file in (neo)vim}
{--d|directory : Open the directory that the config file is located in}
{--p|path : Show the config file's path}
{--qobuz : Set the credentials for Qobuz}
{--tidal : Log into Tidal}
{--deezer : Set the Deezer ARL}
{--music-app : Configure the config file for usage with the macOS Music App}
{--reset : Reset the config file}
{--update : Reset the config file, keeping the credentials}
"""
_config: Config
def handle(self):
import shutil
from .constants import CONFIG_DIR, CONFIG_PATH
self._config = Config()
if self.option("path"):
self.line(f"<info>{CONFIG_PATH}</info>")
if self.option("open"):
self.line(f"Opening <url>{CONFIG_PATH}</url> in default application")
launch(CONFIG_PATH)
if self.option("reset"):
self._config.reset()
if self.option("update"):
self._config.update()
if self.option("open-vim"):
if shutil.which("nvim") is not None:
os.system(f"nvim '{CONFIG_PATH}'")
else:
os.system(f"vim '{CONFIG_PATH}'")
if self.option("directory"):
self.line(f"Opening <url>{CONFIG_DIR}</url>")
launch(CONFIG_DIR)
if self.option("tidal"):
from streamrip.clients import TidalClient
client = TidalClient()
client.login()
self._config.file["tidal"].update(client.get_tokens())
self._config.save()
self.line("<info>Credentials saved to config.</info>")
if self.option("deezer"):
from streamrip.clients import DeezerClient
from streamrip.exceptions import AuthenticationError
self.line(
"Follow the instructions at <url>https://github.com"
"/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie</url>"
)
given_arl = self.ask("Paste your ARL here: ").strip()
self.line("<comment>Validating arl...</comment>")
try:
DeezerClient().login(arl=given_arl)
self._config.file["deezer"]["arl"] = given_arl
self._config.save()
self.line("<b>Sucessfully logged in!</b>")
except AuthenticationError:
self.line("<error>Could not log in. Double check your ARL</error>")
if self.option("qobuz"):
import getpass
import hashlib
self._config.file["qobuz"]["email"] = self.ask("Qobuz email:")
self._config.file["qobuz"]["password"] = hashlib.md5(
getpass.getpass("Qobuz password (won't show on screen): ").encode()
).hexdigest()
self._config.save()
if self.option("music-app"):
self._conf_music_app()
def _conf_music_app(self):
import subprocess
import xml.etree.ElementTree as ET
from pathlib import Path
from tempfile import mktemp
# Find the Music library folder
temp_file = mktemp()
music_pref_plist = Path(Path.home()) / Path(
"Library/Preferences/com.apple.Music.plist"
)
# copy preferences to tempdir
subprocess.run(["cp", music_pref_plist, temp_file])
# convert binary to xml for parsing
subprocess.run(["plutil", "-convert", "xml1", temp_file])
items = iter(ET.parse(temp_file).getroot()[0])
for item in items:
if item.text == "NSNavLastRootDirectory":
break
library_folder = Path(next(items).text)
os.remove(temp_file)
# cp ~/library/preferences/com.apple.music.plist music.plist
# plutil -convert xml1 music.plist
# cat music.plist | pbcopy
self._config.file["downloads"]["folder"] = os.path.join(
library_folder, "Automatically Add to Music.localized"
)
conversion_config = self._config.file["conversion"]
conversion_config["enabled"] = True
conversion_config["codec"] = "ALAC"
conversion_config["sampling_rate"] = 48000
conversion_config["bit_depth"] = 24
self._config.file["filepaths"]["folder_format"] = ""
self._config.file["artwork"]["keep_hires_cover"] = False
self._config.save()
class ConvertCommand(Command):
name = "convert"
description = (
"A standalone tool that converts audio files to other codecs en masse."
)
arguments = [
argument(
"codec",
description="<cmd>FLAC</cmd>, <cmd>ALAC</cmd>, <cmd>OPUS</cmd>, <cmd>MP3</cmd>, or <cmd>AAC</cmd>.",
),
argument(
"path",
description="The path to the audio file or a directory that contains audio files.",
),
]
options = [
option(
"sampling-rate",
"-s",
description="Downsample the tracks to this rate, in Hz.",
default=192000,
flag=False,
),
option(
"bit-depth",
"-b",
description="Downsample the tracks to this bit depth.",
default=24,
flag=False,
),
option(
"keep-source", "-k", description="Keep the original file after conversion."
),
]
help = (
"\nConvert all of the audio files in <path>/my/music</path> to MP3s\n"
"$ <cmd>rip convert MP3 /my/music</cmd>\n\n"
"Downsample the audio to 48kHz after converting them to ALAC\n"
"$ <cmd>rip convert --sampling-rate 48000 ALAC /my/music\n"
)
def handle(self):
from streamrip import converter
CODEC_MAP = {
"FLAC": converter.FLAC,
"ALAC": converter.ALAC,
"OPUS": converter.OPUS,
"MP3": converter.LAME,
"AAC": converter.AAC,
}
codec = self.argument("codec")
path = self.argument("path")
ConverterCls = CODEC_MAP.get(codec.upper())
if ConverterCls is None:
self.line(
f'<error>Invalid codec "{codec}". See </error><cmd>rip convert'
" -h</cmd>."
)
return 1
sampling_rate, bit_depth, keep_source = clean_options(
self.option("sampling-rate"),
self.option("bit-depth"),
self.option("keep-source"),
)
converter_args = {
"sampling_rate": sampling_rate,
"bit_depth": bit_depth,
"remove_source": not keep_source,
}
if os.path.isdir(path):
import itertools
from pathlib import Path
from tqdm import tqdm
dirname = path
audio_extensions = ("flac", "m4a", "aac", "opus", "mp3", "ogg")
path_obj = Path(dirname)
audio_files = (
path.as_posix()
for path in itertools.chain.from_iterable(
(path_obj.rglob(f"*.{ext}") for ext in audio_extensions)
)
)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file in audio_files:
futures.append(
executor.submit(
ConverterCls(
filename=os.path.join(dirname, file),
**converter_args,
).convert
)
)
from streamrip.utils import TQDM_BAR_FORMAT
for future in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Converting",
unit="track",
bar_format=TQDM_BAR_FORMAT,
):
# Only show loading bar
future.result()
elif os.path.isfile(path):
ConverterCls(filename=path, **converter_args).convert()
else:
self.line(
f'<error>Path <path>"{path}"</path> does not exist.</error>',
)
class RepairCommand(Command):
name = "repair"
description = "Retry failed downloads."
options = [
option(
"max-items",
"-m",
flag=False,
description="The maximum number of tracks to download}",
default="None",
)
]
help = "\nRetry up to 20 failed downloads\n$ <cmd>rip repair --max-items 20</cmd>\n"
def handle(self):
max_items = next(clean_options(self.option("max-items")))
config = Config()
RipCore(config).repair(max_items=max_items)
class DatabaseCommand(Command):
name = "db"
description = "View and manage rip's databases."
arguments = [
argument(
"name", description="<cmd>downloads</cmd> or <cmd>failed-downloads</cmd>."
)
]
options = [
option("list", "-l", description="Display the contents of the database."),
option("reset", description="Reset the database."),
]
_table_style = "box-double"
def handle(self) -> None:
from . import db
from .config import Config
config = Config()
db_name = self.argument("name").replace("-", "_")
self._path = config.file["database"][db_name]["path"]
self._db = db.CLASS_MAP[db_name](self._path)
if self.option("list"):
getattr(self, f"_render_{db_name}")()
if self.option("reset"):
os.remove(self._path)
def _render_downloads(self):
from cleo.ui.table import Table
id_table = Table(self._io)
id_table.set_style(self._table_style)
id_table.set_header_title("IDs")
id_table.set_headers(list(self._db.structure.keys()))
id_table.add_rows(id for id in iter(self._db) if id[0].isalnum())
if id_table._rows:
id_table.render()
url_table = Table(self._io)
url_table.set_style(self._table_style)
url_table.set_header_title("URLs")
url_table.set_headers(list(self._db.structure.keys()))
url_table.add_rows(id for id in iter(self._db) if not id[0].isalnum())
# prevent wierd formatting
if url_table._rows:
url_table.render()
def _render_failed_downloads(self):
from cleo.ui.table import Table
id_table = Table(self._io)
id_table.set_style(self._table_style)
id_table.set_header_title("Failed Downloads")
id_table.set_headers(["Source", "Media Type", "ID"])
id_table.add_rows(iter(self._db))
id_table.render()
STRING_TO_PRIMITIVE = {
"None": None,
"True": True,
"False": False,
}
class Application(BaseApplication):
def __init__(self):
super().__init__("rip", __version__)
def _run(self, io):
if io.is_debug():
from .constants import CONFIG_DIR
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(CONFIG_DIR, "streamrip.log"))
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
super()._run(io)
def create_io(self, input=None, output=None, error_output=None):
io = super().create_io(input, output, error_output)
# Set our own CLI styles
formatter = io.output.formatter
formatter.set_style("url", Style("blue", options=["underline"]))
formatter.set_style("path", Style("green", options=["bold"]))
formatter.set_style("cmd", Style("magenta"))
formatter.set_style("title", Style("yellow", options=["bold"]))
formatter.set_style("header", Style("yellow", options=["bold", "underline"]))
io.output.set_formatter(formatter)
io.error_output.set_formatter(formatter)
self._io = io
return io
@property
def _default_definition(self):
default_globals = super()._default_definition
# as of 1.0.0a3, the descriptions don't wrap properly
# so I'm truncating the description for help as a hack
default_globals._options["help"]._description = (
default_globals._options["help"]._description.split(".")[0] + "."
)
return default_globals
def render_error(self, error, io):
super().render_error(error, io)
io.write_line(
"\n<error>If this was unexpected, please open a <path>Bug Report</path> at </error>"
"<url>https://github.com/nathom/streamrip/issues/new/choose</url>"
)
def clean_options(*opts):
for opt in opts:
if isinstance(opt, str):
if opt.startswith("="):
opt = opt[1:]
opt = opt.strip()
if opt.isdigit():
opt = int(opt)
else:
opt = STRING_TO_PRIMITIVE.get(opt, opt)
yield opt
def is_outdated():
global outdated
global newest_version
r = requests.get("https://pypi.org/pypi/streamrip/json").json()
newest_version = r["info"]["version"]
# Compare versions
curr_version_parsed = map(int, __version__.split("."))
assert isinstance(newest_version, str)
newest_version_parsed = map(int, newest_version.split("."))
outdated = False
for c, n in zip(curr_version_parsed, newest_version_parsed):
outdated = c < n
if c != n:
break
def main():
application = Application()
application.add(DownloadCommand())
application.add(SearchCommand())
application.add(DiscoverCommand())
application.add(LastfmCommand())
application.add(ConfigCommand())
application.add(ConvertCommand())
application.add(RepairCommand())
application.add(DatabaseCommand())
application.run()
if __name__ == "__main__":
main()

View file

@ -1,267 +0,0 @@
"""A config class that manages arguments between the config file and CLI."""
import logging
import os
from dataclasses import dataclass
import tomlkit
logger = logging.getLogger("streamrip")
CURRENT_CONFIG_VERSION = "2.0"
DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.toml")
@dataclass(slots=True)
class QobuzConfig:
use_auth_token: bool
email_or_userid: str
# This is an md5 hash of the plaintext password
password_or_token: str
# Do not change
app_id: str
quality: int
# This will download booklet pdfs that are included with some albums
download_booklets: bool
# Do not change
secrets: list[str]
@dataclass(slots=True)
class TidalConfig:
# Do not change any of the fields below
user_id: str
country_code: str
access_token: str
refresh_token: str
# Tokens last 1 week after refresh. This is the Unix timestamp of the expiration
# time. If you haven't used streamrip in more than a week, you may have to log
# in again using `rip config --tidal`
token_expiry: str
# 0: 256kbps AAC, 1: 320kbps AAC, 2: 16/44.1 "HiFi" FLAC, 3: 24/44.1 "MQA" FLAC
quality: int
# This will download videos included in Video Albums.
download_videos: bool
@dataclass(slots=True)
class DeezerConfig:
# An authentication cookie that allows streamrip to use your Deezer account
# See https://github.com/nathom/streamrip/wiki/Finding-Your-Deezer-ARL-Cookie
# for instructions on how to find this
arl: str
# 0, 1, or 2
# This only applies to paid Deezer subscriptions. Those using deezloader
# are automatically limited to quality = 1
quality: int
# This allows for free 320kbps MP3 downloads from Deezer
# If an arl is provided, deezloader is never used
use_deezloader: bool
# This warns you when the paid deezer account is not logged in and rip falls
# back to deezloader, which is unreliable
deezloader_warnings: bool
@dataclass(slots=True)
class SoundcloudConfig:
# This changes periodically, so it needs to be updated
client_id: str
app_version: str
# Only 0 is available for now
quality: int
@dataclass(slots=True)
class YoutubeConfig:
# The path to download the videos to
video_downloads_folder: str
# Only 0 is available for now
quality: int
# Download the video along with the audio
download_videos: bool
@dataclass(slots=True)
class DatabaseConfig:
downloads_enabled: bool
downloads_path: str
failed_downloads_enabled: bool
failed_downloads_path: str
@dataclass(slots=True)
class ConversionConfig:
enabled: bool
# FLAC, ALAC, OPUS, MP3, VORBIS, or AAC
codec: str
# In Hz. Tracks are downsampled if their sampling rate is greater than this.
# Value of 48000 is recommended to maximize quality and minimize space
sampling_rate: int
# Only 16 and 24 are available. It is only applied when the bit depth is higher
# than this value.
bit_depth: int
# Only applicable for lossy codecs
lossy_bitrate: int
@dataclass(slots=True)
class QobuzDiscographyFilterConfig:
# Remove Collectors Editions, live recordings, etc.
extras: bool
# Picks the highest quality out of albums with identical titles.
repeats: bool
# Remove EPs and Singles
non_albums: bool
# Remove albums whose artist is not the one requested
features: bool
# Skip non studio albums
non_studio_albums: bool
# Only download remastered albums
non_remaster: bool
@dataclass(slots=True)
class ArtworkConfig:
# Write the image to the audio file
embed: bool
# The size of the artwork to embed. Options: thumbnail, small, large, original.
# "original" images can be up to 30MB, and may fail embedding.
# Using "large" is recommended.
size: str
# Both of these options limit the size of the embedded artwork. If their values
# are larger than the actual dimensions of the image, they will be ignored.
# If either value is -1, the image is left untouched.
max_width: int
max_height: int
# Save the cover image at the highest quality as a seperate jpg file
keep_hires_cover: bool
@dataclass(slots=True)
class MetadataConfig:
# Sets the value of the 'ALBUM' field in the metadata to the playlist's name.
# This is useful if your music library software organizes tracks based on album name.
set_playlist_to_album: bool
# Replaces the original track's tracknumber with it's position in the playlist
new_playlist_tracknumbers: bool
# The following metadata tags won't be applied
# See https://github.com/nathom/streamrip/wiki/Metadata-Tag-Names for more info
exclude: list[str]
@dataclass(slots=True)
class FilepathsConfig:
# Create folders for single tracks within the downloads directory using the folder_format
# template
add_singles_to_folder: bool
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "container", "id", and "albumcomposer"
folder_format: str
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "albumcomposer"
track_format: str
# Only allow printable ASCII characters in filenames.
restrict_characters: bool
# Truncate the filename if it is greater than 120 characters
# Setting this to false may cause downloads to fail on some systems
truncate: bool
@dataclass(slots=True)
class DownloadsConfig:
# Folder where tracks are downloaded to
folder: str
# Put Qobuz albums in a 'Qobuz' folder, Tidal albums in 'Tidal' etc.
source_subdirectories: bool
# Download (and convert) tracks all at once, instead of sequentially.
# If you are converting the tracks, or have fast internet, this will
# substantially improve processing speed.
concurrency: bool
# The maximum number of tracks to download at once
# If you have very fast internet, you will benefit from a higher value,
# A value that is too high for your bandwidth may cause slowdowns
max_connections: int
requests_per_minute: int
@dataclass(slots=True)
class LastFmConfig:
# The source on which to search for the tracks.
source: str
# If no results were found with the primary source, the item is searched for
# on this one.
fallback_source: str
@dataclass(slots=True)
class ThemeConfig:
# Options: "dainty" or "plain"
progress_bar: str
@dataclass(slots=True)
class Config:
downloads: DownloadsConfig
qobuz: QobuzConfig
tidal: TidalConfig
deezer: DeezerConfig
soundcloud: SoundcloudConfig
youtube: YoutubeConfig
lastfm: LastFmConfig
filepaths: FilepathsConfig
artwork: ArtworkConfig
metadata: MetadataConfig
qobuz_filter: QobuzDiscographyFilterConfig
theme: ThemeConfig
database: DatabaseConfig
_modified: bool = False
@classmethod
def from_toml(cls, toml_str: str):
# TODO: handle the mistake where Windows people forget to escape backslash
toml = tomlkit.parse(toml_str) # type: ignore
if toml["misc"]["version"] != CURRENT_CONFIG_VERSION: # type: ignore
raise Exception("Need to update config")
downloads = DownloadsConfig(**toml["downloads"]) # type: ignore
qobuz = QobuzConfig(**toml["qobuz"]) # type: ignore
tidal = TidalConfig(**toml["tidal"]) # type: ignore
deezer = DeezerConfig(**toml["deezer"]) # type: ignore
soundcloud = SoundcloudConfig(**toml["soundcloud"]) # type: ignore
youtube = YoutubeConfig(**toml["youtube"]) # type: ignore
lastfm = LastFmConfig(**toml["lastfm"]) # type: ignore
artwork = ArtworkConfig(**toml["artwork"]) # type: ignore
filepaths = FilepathsConfig(**toml["filepaths"]) # type: ignore
metadata = MetadataConfig(**toml["metadata"]) # type: ignore
qobuz_filter = QobuzDiscographyFilterConfig(**toml["qobuz_filters"]) # type: ignore
theme = ThemeConfig(**toml["theme"]) # type: ignore
database = DatabaseConfig(**toml["database"]) # type: ignore
return cls(
downloads=downloads,
qobuz=qobuz,
tidal=tidal,
deezer=deezer,
soundcloud=soundcloud,
youtube=youtube,
lastfm=lastfm,
artwork=artwork,
filepaths=filepaths,
metadata=metadata,
qobuz_filter=qobuz_filter,
theme=theme,
database=database,
)
@classmethod
def defaults(cls):
with open(DEFAULT_CONFIG_PATH) as f:
return cls.from_toml(f.read())
def set_modified(self):
self._modified = True

View file

@ -1,178 +0,0 @@
[downloads]
# Folder where tracks are downloaded to
folder = ""
# Put Qobuz albums in a 'Qobuz' folder, Tidal albums in 'Tidal' etc.
source_subdirectories = false
# Download (and convert) tracks all at once, instead of sequentially.
# If you are converting the tracks, or have fast internet, this will
# substantially improve processing speed.
concurrency = true
# The maximum number of tracks to download at once
# If you have very fast internet, you will benefit from a higher value,
# A value that is too high for your bandwidth may cause slowdowns
max_connections = 3
# Max number of API requests to handle per minute
# Set to -1 for no limit
requests_per_minute = -1
[qobuz]
# 1: 320kbps MP3, 2: 16/44.1, 3: 24/<=96, 4: 24/>=96
quality = 3
# This will download booklet pdfs that are included with some albums
download_booklets = true
# Authenticate to Qobuz using auth token? Value can be true/false only
use_auth_token = false
# Enter your userid if the above use_auth_token is set to true, else enter your email
email_or_userid = ""
# Enter your auth token if the above use_auth_token is set to true, else enter the md5 hash of your plaintext password
password_or_token = ""
# Do not change
app_id = ""
# Do not change
secrets = []
[tidal]
# 0: 256kbps AAC, 1: 320kbps AAC, 2: 16/44.1 "HiFi" FLAC, 3: 24/44.1 "MQA" FLAC
quality = 3
# This will download videos included in Video Albums.
download_videos = true
# Do not change any of the fields below
user_id = ""
country_code = ""
access_token = ""
refresh_token = ""
# Tokens last 1 week after refresh. This is the Unix timestamp of the expiration
# time. If you haven't used streamrip in more than a week, you may have to log
# in again using `rip config --tidal`
token_expiry = ""
[deezer]
# 0, 1, or 2
# This only applies to paid Deezer subscriptions. Those using deezloader
# are automatically limited to quality = 1
quality = 2
# An authentication cookie that allows streamrip to use your Deezer account
# See https://github.com/nathom/streamrip/wiki/Finding-Your-Deezer-ARL-Cookie
# for instructions on how to find this
arl = ""
# This allows for free 320kbps MP3 downloads from Deezer
# If an arl is provided, deezloader is never used
use_deezloader = true
# This warns you when the paid deezer account is not logged in and rip falls
# back to deezloader, which is unreliable
deezloader_warnings = true
[soundcloud]
# Only 0 is available for now
quality = 0
# This changes periodically, so it needs to be updated
client_id = ""
app_version = ""
[youtube]
# Only 0 is available for now
quality = 0
# Download the video along with the audio
download_videos = false
# The path to download the videos to
video_downloads_folder = ""
# This stores a list of item IDs so that repeats are not downloaded.
[database]
downloads_enabled = true
downloads_path = ""
# If a download fails, the item ID is stored here. Then, `rip repair` can be
# called to retry the downloads
failed_downloads_enabled = true
failed_downloads_path = ""
# Convert tracks to a codec after downloading them.
[conversion]
enabled = false
# FLAC, ALAC, OPUS, MP3, VORBIS, or AAC
codec = "ALAC"
# In Hz. Tracks are downsampled if their sampling rate is greater than this.
# Value of 48000 is recommended to maximize quality and minimize space
sampling_rate = 48000
# Only 16 and 24 are available. It is only applied when the bit depth is higher
# than this value.
bit_depth = 24
# Only applicable for lossy codecs
lossy_bitrate = 320
# Filter a Qobuz artist's discography. Set to 'true' to turn on a filter.
[qobuz_filters]
# Remove Collectors Editions, live recordings, etc.
extras = false
# Picks the highest quality out of albums with identical titles.
repeats = false
# Remove EPs and Singles
non_albums = false
# Remove albums whose artist is not the one requested
features = false
# Skip non studio albums
non_studio_albums = false
# Only download remastered albums
non_remaster = false
[artwork]
# Write the image to the audio file
embed = true
# The size of the artwork to embed. Options: thumbnail, small, large, original.
# "original" images can be up to 30MB, and may fail embedding.
# Using "large" is recommended.
size = "large"
# Both of these options limit the size of the embedded artwork. If their values
# are larger than the actual dimensions of the image, they will be ignored.
# If either value is -1, the image is left untouched.
max_width = -1
max_height = -1
# Save the cover image at the highest quality as a seperate jpg file
keep_hires_cover = true
[metadata]
# Sets the value of the 'ALBUM' field in the metadata to the playlist's name.
# This is useful if your music library software organizes tracks based on album name.
set_playlist_to_album = true
# Replaces the original track's tracknumber with it's position in the playlist
new_playlist_tracknumbers = true
# The following metadata tags won't be applied
# See https://github.com/nathom/streamrip/wiki/Metadata-Tag-Names for more info
exclude = []
# Changes the folder and file names generated by streamrip.
[filepaths]
# Create folders for single tracks within the downloads directory using the folder_format
# template
add_singles_to_folder = false
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "container", "id", and "albumcomposer"
folder_format = "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "albumcomposer"
track_format = "{tracknumber}. {artist} - {title}{explicit}"
# Only allow printable ASCII characters in filenames.
restrict_characters = false
# Truncate the filename if it is greater than 120 characters
# Setting this to false may cause downloads to fail on some systems
truncate = true
# Last.fm playlists are downloaded by searching for the titles of the tracks
[lastfm]
# The source on which to search for the tracks.
source = "qobuz"
# If no results were found with the primary source, the item is searched for
# on this one.
fallback_source = "deezer"
[theme]
# Options: "dainty" or "plain"
progress_bar = "dainty"
[misc]
# Metadata to identify this config file. Do not change.
version = "2.0"

View file

@ -1,30 +0,0 @@
"""Various constant values that are used by RipCore."""
import os
import re
from pathlib import Path
from appdirs import user_config_dir
APPNAME = "streamrip"
APP_DIR = user_config_dir(APPNAME)
HOME = Path.home()
LOG_DIR = CACHE_DIR = CONFIG_DIR = APP_DIR
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml")
DB_PATH = os.path.join(LOG_DIR, "downloads.db")
FAILED_DB_PATH = os.path.join(LOG_DIR, "failed_downloads.db")
DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads")
URL_REGEX = re.compile(
r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = re.compile(r"https://soundcloud.com/[-\w:/]+")
LASTFM_URL_REGEX = re.compile(r"https://www.last.fm/user/\w+/playlists/\w+")
QOBUZ_INTERPRETER_URL_REGEX = re.compile(
r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+"
)
DEEZER_DYNAMIC_LINK_REGEX = re.compile(r"https://deezer\.page\.link/\w+")
YOUTUBE_URL_REGEX = re.compile(r"https://www\.youtube\.com/watch\?v=[-\w]+")

View file

@ -1,935 +0,0 @@
"""The stuff that ties everything together for the CLI to use."""
import concurrent.futures
import html
import logging
import os
import re
import threading
from getpass import getpass
from hashlib import md5
from string import Formatter
from typing import Dict, Generator, List, Optional, Tuple, Type, Union
import requests
from click import secho, style
from tqdm import tqdm
from streamrip.clients import (
Client,
DeezerClient,
DeezloaderClient,
QobuzClient,
SoundCloudClient,
TidalClient,
)
from streamrip.constants import MEDIA_TYPES
from streamrip.exceptions import (
AuthenticationError,
IneligibleError,
ItemExists,
MissingCredentials,
NonStreamable,
NoResultsFound,
ParsingError,
PartialFailure,
)
from streamrip.media import (
Album,
Artist,
Label,
Playlist,
Track,
Tracklist,
Video,
YoutubeVideo,
)
from streamrip.utils import TQDM_DEFAULT_THEME, set_progress_bar_theme
from . import db
from .config import Config
from .constants import (
CONFIG_PATH,
DB_PATH,
DEEZER_DYNAMIC_LINK_REGEX,
FAILED_DB_PATH,
LASTFM_URL_REGEX,
QOBUZ_INTERPRETER_URL_REGEX,
SOUNDCLOUD_URL_REGEX,
URL_REGEX,
YOUTUBE_URL_REGEX,
)
from .exceptions import DeezloaderFallback
from .utils import extract_deezer_dynamic_link, extract_interpreter_url
logger = logging.getLogger("streamrip")
# ---------------- Constants ------------------ #
Media = Union[
Type[Album],
Type[Playlist],
Type[Artist],
Type[Track],
Type[Label],
Type[Video],
]
MEDIA_CLASS: Dict[str, Media] = {
"album": Album,
"playlist": Playlist,
"artist": Artist,
"track": Track,
"label": Label,
"video": Video,
}
DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH}
# ---------------------------------------------- #
class RipCore(list):
"""RipCore."""
clients = {
"qobuz": QobuzClient(),
"tidal": TidalClient(),
"deezer": DeezerClient(),
"soundcloud": SoundCloudClient(),
"deezloader": DeezloaderClient(),
}
def __init__(
self,
config: Optional[Config] = None,
):
"""Create a RipCore object.
:param config:
:type config: Optional[Config]
"""
self.config: Config
if config is None:
self.config = Config(CONFIG_PATH)
else:
self.config = config
if (theme := self.config.file["theme"]["progress_bar"]) != TQDM_DEFAULT_THEME:
set_progress_bar_theme(theme.lower())
def get_db(db_type: str) -> db.Database:
db_settings = self.config.session["database"]
db_class = db.CLASS_MAP[db_type]
if db_settings[db_type]["enabled"] and db_settings.get("enabled", True):
default_db_path = DB_PATH_MAP[db_type]
path = db_settings[db_type]["path"]
if path:
database = db_class(path)
else:
database = db_class(default_db_path)
assert config is not None
config.file["database"][db_type]["path"] = default_db_path
config.save()
else:
database = db_class("", dummy=True)
return database
self.db = get_db("downloads")
self.failed_db = get_db("failed_downloads")
def handle_urls(self, urls):
"""Download a url.
:param url:
:type url: str
:raises InvalidSourceError
:raises ParsingError
"""
if isinstance(urls, str):
url = urls
elif isinstance(urls, tuple):
url = " ".join(urls)
else:
raise Exception(f"Urls has invalid type {type(urls)}")
# youtube is handled by youtube-dl, so much of the
# processing is not necessary
youtube_urls = YOUTUBE_URL_REGEX.findall(url)
if youtube_urls != []:
self.extend(YoutubeVideo(u) for u in youtube_urls)
parsed = self.parse_urls(url)
if not parsed and len(self) == 0:
if "last.fm" in url:
message = (
f"For last.fm urls, use the {style('lastfm', fg='yellow')} "
f"command. See {style('rip lastfm --help', fg='yellow')}."
)
else:
message = f"Cannot find urls in text: {url}"
raise ParsingError(message)
for source, url_type, item_id in parsed:
if item_id in self.db:
secho(
f"ID {item_id} already downloaded, use --ignore-db to override.",
fg="magenta",
)
continue
self.handle_item(source, url_type, item_id)
def handle_item(self, source: str, media_type: str, item_id: str):
"""Get info and parse into a Media object.
:param source:
:type source: str
:param media_type:
:type media_type: str
:param item_id:
:type item_id: str
"""
client = self.get_client(source)
if media_type not in MEDIA_TYPES:
if "playlist" in media_type: # for SoundCloud
media_type = "playlist"
assert media_type in MEDIA_TYPES, media_type
item = MEDIA_CLASS[media_type](client=client, id=item_id)
self.append(item)
def _get_download_args(self) -> dict:
"""Get the arguments to pass to Media.download.
:rtype: dict
"""
session = self.config.session
logger.debug(session)
# So that the dictionary isn't searched for the same keys multiple times
artwork, conversion, filepaths, metadata = (
session[key] for key in ("artwork", "conversion", "filepaths", "metadata")
)
concurrency = session["downloads"]["concurrency"]
return {
"restrict_filenames": filepaths["restrict_characters"],
"truncate_filenames": filepaths["truncate"],
"parent_folder": session["downloads"]["folder"],
"folder_format": filepaths["folder_format"],
"track_format": filepaths["track_format"],
"embed_cover": artwork["embed"],
"embed_cover_size": artwork["size"],
"keep_hires_cover": artwork["keep_hires_cover"],
"set_playlist_to_album": metadata["set_playlist_to_album"],
"stay_temp": conversion["enabled"],
"conversion": conversion,
"concurrent_downloads": concurrency["enabled"],
"max_connections": concurrency["max_connections"],
"new_tracknumbers": metadata["new_playlist_tracknumbers"],
"download_videos": session["tidal"]["download_videos"],
"download_booklets": session["qobuz"]["download_booklets"],
"download_youtube_videos": session["youtube"]["download_videos"],
"youtube_video_downloads_folder": session["youtube"][
"video_downloads_folder"
],
"add_singles_to_folder": filepaths["add_singles_to_folder"],
"max_artwork_width": int(artwork["max_width"]),
"max_artwork_height": int(artwork["max_height"]),
"exclude_tags": metadata["exclude"],
}
def repair(self, max_items=None):
"""Iterate through the failed_downloads database and retry them.
:param max_items: The maximum number of items to download.
"""
if max_items is None:
max_items = float("inf")
self.db = db.Downloads("", dummy=True)
if self.failed_db.is_dummy:
secho(
"Failed downloads database must be enabled in the config file "
"to repair!",
fg="red",
)
exit()
for counter, (source, media_type, item_id) in enumerate(self.failed_db):
if counter >= max_items:
break
self.handle_item(source, media_type, item_id)
self.download()
def download(self):
"""Download all the items in self."""
try:
arguments = self._get_download_args()
except KeyError as e:
self._config_updating_message()
self.config.update()
logger.debug("Config update error: %s", e)
exit()
except Exception as err:
self._config_corrupted_message(err)
exit()
logger.debug("Arguments from config: %s", arguments)
source_subdirs = self.config.session["downloads"]["source_subdirectories"]
for item in self:
# Item already checked in database in handle_urls
if source_subdirs:
arguments["parent_folder"] = self.__get_source_subdir(
item.client.source
)
if item is YoutubeVideo:
item.download(**arguments)
continue
arguments["quality"] = self.config.session[item.client.source]["quality"]
if isinstance(item, Artist):
filters_ = tuple(
k for k, v in self.config.session["filters"].items() if v
)
arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_)
if not isinstance(item, Tracklist) or not item.loaded:
logger.debug("Loading metadata")
try:
item.load_meta(**arguments)
except NonStreamable:
self.failed_db.add((item.client.source, item.type, item.id))
secho(f"{item!s} is not available, skipping.", fg="red")
continue
try:
item.download(**arguments)
for item_id in item.downloaded_ids:
# Add items row by row
self.db.add((item_id,))
except NonStreamable as e:
e.print(item)
self.failed_db.add((item.client.source, item.type, item.id))
continue
except PartialFailure as e:
# add successful downloads to database?
for failed_item_info in e.failed_items:
self.failed_db.add(failed_item_info)
continue
except ItemExists as e:
secho(f'"{e!s}" already exists. Skipping.', fg="yellow")
continue
if hasattr(item, "id"):
self.db.add(str(item.id))
for item_id in item.downloaded_ids:
self.db.add(str(item_id))
if isinstance(item, Track):
item.tag(exclude_tags=arguments["exclude_tags"])
if arguments["conversion"]["enabled"]:
item.convert(**arguments["conversion"])
def scrape(self, featured_list: str, max_items: int = 500):
"""Download all of the items in a Qobuz featured list.
:param featured_list: The name of the list. See `rip discover --help`.
:type featured_list: str
"""
self.extend(self.search("qobuz", featured_list, "featured", limit=max_items))
def get_client(self, source: str) -> Client:
"""Get a client given the source and log in.
:param source:
:type source: str
:rtype: Client
"""
client = self.clients[source]
if not client.logged_in:
try:
self.login(client)
except DeezloaderFallback:
client = self.clients["deezloader"]
return client
def login(self, client):
"""Log into a client, if applicable.
:param client:
"""
creds = self.config.creds(client.source)
if client.source == "deezer" and creds["arl"] == "":
if self.config.session["deezer"]["deezloader_warnings"]:
secho(
"Falling back to Deezloader (unstable). If you have a subscription, run ",
nl=False,
fg="yellow",
)
secho("rip config --deezer ", nl=False, bold=True)
secho("to log in.", fg="yellow")
raise DeezloaderFallback
while True:
try:
client.login(**creds)
break
except AuthenticationError:
secho("Invalid credentials, try again.", fg="yellow")
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
except MissingCredentials:
logger.debug("Credentials are missing. Prompting..")
get_tokens = threading.Thread(
target=client._get_app_id_and_secrets, daemon=True
)
get_tokens.start()
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
get_tokens.join()
if (
client.source == "qobuz"
and not creds.get("secrets")
and not creds.get("app_id")
):
(
self.config.file["qobuz"]["app_id"],
self.config.file["qobuz"]["secrets"],
) = client.get_tokens()
self.config.save()
elif (
client.source == "soundcloud"
and not creds.get("client_id")
and not creds.get("app_version")
):
(
self.config.file["soundcloud"]["client_id"],
self.config.file["soundcloud"]["app_version"],
) = client.get_tokens()
self.config.save()
elif client.source == "tidal":
self.config.file["tidal"].update(client.get_tokens())
self.config.save() # only for the expiry stamp
def parse_urls(self, url: str) -> List[Tuple[str, str, str]]:
"""Return the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/type/name/id
https://open.qobuz.com/type/id
https://play.qobuz.com/type/id
https://www.deezer.com/us/type/id
https://tidal.com/browse/type/id
:raises exceptions.ParsingError:
"""
parsed: List[Tuple[str, str, str]] = []
interpreter_urls = QOBUZ_INTERPRETER_URL_REGEX.findall(url)
if interpreter_urls:
secho(
"Extracting IDs from Qobuz interpreter urls. Use urls "
"that include the artist ID for faster preprocessing.",
fg="yellow",
)
parsed.extend(
("qobuz", "artist", extract_interpreter_url(u))
for u in interpreter_urls
)
url = QOBUZ_INTERPRETER_URL_REGEX.sub("", url)
dynamic_urls = DEEZER_DYNAMIC_LINK_REGEX.findall(url)
if dynamic_urls:
secho(
"Extracting IDs from Deezer dynamic link. Use urls "
"of the form https://www.deezer.com/{country}/{type}/{id} for "
"faster processing.",
fg="yellow",
)
parsed.extend(
("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls
)
parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Deezer
soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url)
if soundcloud_urls:
soundcloud_client = self.get_client("soundcloud")
assert isinstance(soundcloud_client, SoundCloudClient) # for typing
# TODO: Make this async
soundcloud_items = (
soundcloud_client.resolve_url(u) for u in soundcloud_urls
)
parsed.extend(
("soundcloud", item["kind"], str(item["id"]))
for item in soundcloud_items
)
logger.debug("Parsed urls: %s", parsed)
return parsed
def handle_lastfm_urls(self, urls: str):
"""Get info from lastfm url, and parse into Media objects.
This works by scraping the last.fm page and using a regex to
find the track titles and artists. The information is queried
in a Client.search(query, 'track') call and the first result is
used.
:param urls:
"""
# Available keys: ['artist', 'title']
QUERY_FORMAT: Dict[str, str] = {
"tidal": "{title}",
"qobuz": "{title} {artist}",
"deezer": "{title} {artist}",
"soundcloud": "{title} {artist}",
}
# For testing:
# https://www.last.fm/user/nathan3895/playlists/12058911
user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+")
lastfm_urls = LASTFM_URL_REGEX.findall(urls)
try:
lastfm_source = self.config.session["lastfm"]["source"]
lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"]
except KeyError:
self._config_updating_message()
self.config.update()
exit()
except Exception as err:
self._config_corrupted_message(err)
exit()
# Do not include tracks that have (re)mix, live, karaoke in their titles
# within parentheses or brackets
# This will match somthing like "Test (Person Remix]" though, so its not perfect
banned_words_plain = re.compile(r"(?i)(?:(?:re)?mix|live|karaoke)")
banned_words = re.compile(
r"(?i)[\(\[][^\)\]]*?(?:(?:re)?mix|live|karaoke)[^\)\]]*[\]\)]"
)
def search_query(title, artist, playlist) -> bool:
"""Search for a query and add the first result to playlist.
:param query:
:type query: str
:param playlist:
:type playlist: Playlist
:rtype: bool
"""
def try_search(source) -> Optional[Track]:
try:
query = QUERY_FORMAT[lastfm_source].format(
title=title, artist=artist
)
query_is_clean = banned_words_plain.search(query) is None
search_results = self.search(source, query, media_type="track")
track = next(search_results)
if query_is_clean:
while banned_words.search(track["title"]) is not None:
logger.debug("Track title banned for query=%s", query)
track = next(search_results)
# Because the track is searched as a single we need to set
# this manually
track.part_of_tracklist = True
return track
except (NoResultsFound, StopIteration):
return None
track = try_search(lastfm_source) or try_search(lastfm_fallback_source)
if track is None:
return False
if self.config.session["metadata"]["set_playlist_to_album"]:
# so that the playlist name (actually the album) isn't
# amended to include version and work tags from individual tracks
track.meta.version = track.meta.work = None
playlist.append(track)
return True
from streamrip.utils import TQDM_BAR_FORMAT
for purl in lastfm_urls:
secho(f"Fetching playlist at {purl}", fg="blue")
title, queries = self.get_lastfm_playlist(purl)
pl = Playlist(client=self.get_client(lastfm_source), name=title)
creator_match = user_regex.search(purl)
if creator_match is not None:
pl.creator = creator_match.group(1)
tracks_not_found = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
futures = [
executor.submit(search_query, title, artist, pl)
for title, artist in queries
]
# only for the progress bar
for search_attempt in tqdm(
concurrent.futures.as_completed(futures),
unit="Tracks",
dynamic_ncols=True,
total=len(futures),
desc="Searching...",
bar_format=TQDM_BAR_FORMAT,
):
if not search_attempt.result():
tracks_not_found += 1
pl.loaded = True
if tracks_not_found > 0:
secho(f"{tracks_not_found} tracks not found.", fg="yellow")
self.append(pl)
def handle_txt(self, filepath: Union[str, os.PathLike]):
"""
Handle a text file containing URLs. Lines starting with `#` are ignored.
:param filepath:
:type filepath: Union[str, os.PathLike]
:raises OSError
:raises exceptions.ParsingError
"""
with open(filepath) as txt:
self.handle_urls(txt.read())
def search(
self,
source: str,
query: str,
media_type: str = "album",
check_db: bool = False,
limit: int = 200,
) -> Generator:
"""Universal search.
:param source:
:type source: str
:param query:
:type query: str
:param media_type:
:type media_type: str
:param limit: Not Implemented
:type limit: int
:rtype: Generator
"""
logger.debug("searching for %s", query)
client = self.get_client(source)
if isinstance(client, DeezloaderClient) and media_type == "featured":
raise IneligibleError(
"Must have premium Deezer account to access editorial lists."
)
results = client.search(query, media_type)
if media_type == "featured":
media_type = "album"
if isinstance(results, Generator): # QobuzClient
for page in results:
tracklist = (
page[f"{media_type}s"]["items"]
if media_type != "featured"
else page["albums"]["items"]
)
for i, item in enumerate(tracklist):
yield MEDIA_CLASS[ # type: ignore
media_type if media_type != "featured" else "album"
].from_api(item, client)
if i >= limit - 1:
return
else:
items = (
results.get("data")
or results.get("items")
or results.get("collection")
or results.get("albums", {}).get("data", False)
)
if not items:
raise NoResultsFound(query)
logger.debug("Number of results: %d", len(items))
for i, item in enumerate(items):
logger.debug(item)
yield MEDIA_CLASS[media_type].from_api(item, client) # type: ignore
if i >= limit - 1:
return
def preview_media(self, media) -> str:
"""Return a preview string of a Media object.
:param media:
"""
if isinstance(media, Album):
fmt = (
"{albumartist} - {album}\n"
"Released on {year}\n{tracktotal} tracks\n"
"{bit_depth} bit / {sampling_rate} Hz\n"
"Version: {version}\n"
"Genre: {genre}"
)
elif isinstance(media, Artist):
fmt = "{name}"
elif isinstance(media, Track):
fmt = "{artist} - {title}\nReleased on {year}"
elif isinstance(media, Playlist):
fmt = (
"{title}\n"
"{tracktotal} tracks\n"
"{popularity}\n"
"Description: {description}"
)
else:
raise NotImplementedError
fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname)
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields})
return ret
def interactive_search(
self,
query: str,
source: str = "qobuz",
media_type: str = "album",
limit: int = 50,
):
"""Show an interactive menu that contains search results.
:param query:
:type query: str
:param source:
:type source: str
:param media_type:
:type media_type: str
"""
results = tuple(self.search(source, query, media_type, limit=limit))
def title(res):
index, item = res
item_no = index + 1
if isinstance(item, Album):
return f"{item_no}. {item.album}"
elif isinstance(item, Track):
return f"{item_no}. {item.meta.title}"
elif isinstance(item, Playlist):
return f"{item_no}. {item.name}"
elif isinstance(item, Artist):
return f"{item_no}. {item.name}"
else:
raise NotImplementedError(item.type)
def from_title(s):
num = []
for char in s:
if char != ".":
num.append(char)
else:
break
return self.preview_media(results[int("".join(num)) - 1])
if os.name == "nt":
from pick import pick
choice = pick(
tuple(enumerate(results)),
title=(
f"{source.capitalize()} {media_type} search.\n"
"Press SPACE to select, RETURN to download, ctrl-C to exit."
),
options_map_func=title,
multiselect=True,
)
if isinstance(choice, list):
for item in choice:
self.append(item[0][1])
elif isinstance(choice, tuple):
self.append(choice[0][1])
return True
else:
from simple_term_menu import TerminalMenu
menu = TerminalMenu(
map(title, enumerate(results)),
preview_command=from_title,
preview_size=0.5,
title=(
f"{source.capitalize()} {media_type} search.\n"
"SPACE - multiselection, ENTER - download, ESC - exit"
),
cycle_cursor=True,
clear_screen=True,
multi_select=True,
)
choice = menu.show()
if choice is None:
return False
else:
if isinstance(choice, int):
self.append(results[choice])
elif isinstance(choice, tuple):
for i in choice:
self.append(results[i])
return True
def get_lastfm_playlist(self, url: str) -> Tuple[str, list]:
"""From a last.fm url, find the playlist title and tracks.
Each page contains 50 results, so `num_tracks // 50 + 1` requests
are sent per playlist.
:param url:
:type url: str
:rtype: Tuple[str, list]
"""
logger.debug("Fetching lastfm playlist")
info = []
words = re.compile(r"[\w\s]+")
title_tags = re.compile(r'<a\s+href="[^"]+"\s+title="([^"]+)"')
def essence(s):
s = re.sub(r"&#\d+;", "", s) # remove HTML entities
# TODO: change to finditer
return "".join(words.findall(s))
def get_titles(s):
titles = title_tags.findall(s) # [2:]
for i in range(0, len(titles) - 1, 2):
info.append((essence(titles[i]), essence(titles[i + 1])))
r = requests.get(url)
get_titles(r.text)
remaining_tracks_match = re.search(
r'data-playlisting-entry-count="(\d+)"', r.text
)
if remaining_tracks_match is None:
raise ParsingError("Error parsing lastfm page: %s", r.text)
total_tracks = int(remaining_tracks_match.group(1))
logger.debug("Total tracks: %d", total_tracks)
remaining_tracks = total_tracks - 50
playlist_title_match = re.search(
r'<h1 class="playlisting-playlist-header-title">([^<]+)</h1>',
r.text,
)
if playlist_title_match is None:
raise ParsingError("Error finding title from response")
playlist_title = html.unescape(playlist_title_match.group(1))
if remaining_tracks > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
last_page = (
1 + int(remaining_tracks // 50) + int(remaining_tracks % 50 != 0)
)
logger.debug("Fetching up to page %d", last_page)
futures = [
executor.submit(requests.get, f"{url}?page={page}")
for page in range(2, last_page + 1)
]
for future in concurrent.futures.as_completed(futures):
get_titles(future.result().text)
return playlist_title, info
def __get_source_subdir(self, source: str) -> str:
path = self.config.session["downloads"]["folder"]
return os.path.join(path, source.capitalize())
def prompt_creds(self, source: str):
"""Prompt the user for credentials.
:param source:
:type source: str
"""
if source == "qobuz":
secho("Enter Qobuz email:", fg="green")
self.config.file[source]["email"] = input()
secho(
"Enter Qobuz password (will not show on screen):",
fg="green",
)
self.config.file[source]["password"] = md5(
getpass(prompt="").encode("utf-8")
).hexdigest()
self.config.save()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
elif source == "deezer":
secho(
"If you're not sure how to find the ARL cookie, see the instructions at ",
italic=True,
nl=False,
dim=True,
)
secho(
"https://github.com/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie",
underline=True,
italic=True,
fg="blue",
)
self.config.file["deezer"]["arl"] = input(style("ARL: ", fg="green"))
self.config.save()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
else:
raise Exception
def _config_updating_message(self):
secho(
"Updating config file... Some settings may be lost. Please run the "
"command again.",
fg="magenta",
)
def _config_corrupted_message(self, err: Exception):
secho(
"There was a problem with your config file. This happens "
"sometimes after updates. Run ",
nl=False,
fg="red",
)
secho("rip config --reset ", fg="yellow", nl=False)
secho("to reset it. You will need to log in again.", fg="red")
secho(str(err), fg="red")

187
rip/db.py
View file

@ -1,187 +0,0 @@
"""Wrapper over a database that stores item IDs."""
import logging
import os
import sqlite3
from typing import Tuple, Union
logger = logging.getLogger("streamrip")
class Database:
"""A wrapper for an sqlite database."""
structure: dict
name: str
def __init__(self, path: str, dummy: bool = False):
"""Create a Database instance.
:param path: Path to the database file.
:param dummy: Make the database empty.
"""
assert self.structure != []
assert self.name
self.path = path
self.is_dummy = dummy
if self.is_dummy:
return
if not os.path.exists(self.path):
self.create()
def create(self):
"""Create a database."""
if self.is_dummy:
return
with sqlite3.connect(self.path) as conn:
params = ", ".join(
f"{key} {' '.join(map(str.upper, props))} NOT NULL"
for key, props in self.structure.items()
)
command = f"CREATE TABLE {self.name} ({params})"
logger.debug("executing %s", command)
conn.execute(command)
def keys(self):
"""Get the column names of the table."""
return self.structure.keys()
def contains(self, **items) -> bool:
"""Check whether items matches an entry in the table.
:param items: a dict of column-name + expected value
:rtype: bool
"""
if self.is_dummy:
return False
allowed_keys = set(self.structure.keys())
assert all(
key in allowed_keys for key in items.keys()
), f"Invalid key. Valid keys: {allowed_keys}"
items = {k: str(v) for k, v in items.items()}
with sqlite3.connect(self.path) as conn:
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})"
logger.debug("Executing %s", command)
return bool(conn.execute(command, tuple(items.values())).fetchone()[0])
def __contains__(self, keys: Union[str, dict]) -> bool:
"""Check whether a key-value pair exists in the database.
:param keys: Either a dict with the structure {key: value_to_search_for, ...},
or if there is only one key in the table, value_to_search_for can be
passed in by itself.
:type keys: Union[str, dict]
:rtype: bool
"""
if isinstance(keys, dict):
return self.contains(**keys)
if isinstance(keys, str) and len(self.structure) == 1:
only_key = tuple(self.structure.keys())[0]
query = {only_key: keys}
logger.debug("Searching for %s in database", query)
return self.contains(**query)
raise TypeError(keys)
def add(self, items: Union[str, Tuple[str]]):
"""Add a row to the table.
:param items: Column-name + value. Values must be provided for all cols.
:type items: Tuple[str]
"""
if self.is_dummy:
return
if isinstance(items, str):
if len(self.structure) == 1:
items = (items,)
else:
raise TypeError(
"Only tables with 1 column can have string inputs. Use a list "
"where len(list) == len(structure)."
)
assert len(items) == len(self.structure)
params = ", ".join(self.structure.keys())
question_marks = ", ".join("?" for _ in items)
command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})"
logger.debug("Executing %s", command)
logger.debug("Items to add: %s", items)
with sqlite3.connect(self.path) as conn:
try:
conn.execute(command, tuple(items))
except sqlite3.IntegrityError as e:
# tried to insert an item that was already there
logger.debug(e)
def remove(self, **items):
"""Remove items from a table.
Warning: NOT TESTED!
:param items:
"""
# not in use currently
if self.is_dummy:
return
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"DELETE FROM {self.name} WHERE {conditions}"
with sqlite3.connect(self.path) as conn:
logger.debug(command)
conn.execute(command, tuple(items.values()))
def __iter__(self):
"""Iterate through the rows of the table."""
if self.is_dummy:
return ()
with sqlite3.connect(self.path) as conn:
return conn.execute(f"SELECT * FROM {self.name}")
def reset(self):
"""Delete the database file."""
try:
os.remove(self.path)
except FileNotFoundError:
pass
class Downloads(Database):
"""A table that stores the downloaded IDs."""
name = "downloads"
structure = {
"id": ["text", "unique"],
}
class FailedDownloads(Database):
"""A table that stores information about failed downloads."""
name = "failed_downloads"
structure = {
"source": ["text"],
"media_type": ["text"],
"id": ["text", "unique"],
}
CLASS_MAP = {db.name: db for db in (Downloads, FailedDownloads)}