mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-14 07:04:51 -04:00
350 lines
9.3 KiB
Python
350 lines
9.3 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from functools import wraps
|
|
|
|
import aiofiles
|
|
import click
|
|
from click_help_colors import HelpColorsGroup # type: ignore
|
|
from rich.logging import RichHandler
|
|
from rich.prompt import Confirm
|
|
from rich.traceback import install
|
|
|
|
from .. import db
|
|
from ..config import DEFAULT_CONFIG_PATH, Config, set_user_defaults
|
|
from ..console import console
|
|
from .main import Main
|
|
|
|
|
|
def coro(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
return asyncio.run(f(*args, **kwargs))
|
|
|
|
return wrapper
|
|
|
|
|
|
@click.group(
|
|
cls=HelpColorsGroup,
|
|
help_headers_color="yellow",
|
|
help_options_color="green",
|
|
)
|
|
@click.version_option(version="2.0")
|
|
@click.option(
|
|
"--config-path",
|
|
default=DEFAULT_CONFIG_PATH,
|
|
help="Path to the configuration file",
|
|
)
|
|
@click.option("-f", "--folder", help="The folder to download items into.")
|
|
@click.option(
|
|
"-ndb",
|
|
"--no-db",
|
|
help="Download items even if they have been logged in the database",
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
"-q", "--quality", help="The maximum quality allowed to download", type=int
|
|
)
|
|
@click.option(
|
|
"-c",
|
|
"--convert",
|
|
help="Convert the downloaded files to an audio codec (ALAC, FLAC, MP3, AAC, or OGG)",
|
|
)
|
|
@click.option(
|
|
"--no-progress",
|
|
help="Do not show progress bars",
|
|
is_flag=True,
|
|
default=False,
|
|
)
|
|
@click.option(
|
|
"-v",
|
|
"--verbose",
|
|
help="Enable verbose output (debug mode)",
|
|
is_flag=True,
|
|
)
|
|
@click.pass_context
|
|
def rip(ctx, config_path, folder, no_db, quality, convert, no_progress, verbose):
|
|
"""Streamrip: the all in one music downloader."""
|
|
global logger
|
|
logging.basicConfig(
|
|
level="INFO",
|
|
format="%(message)s",
|
|
datefmt="[%X]",
|
|
handlers=[RichHandler()],
|
|
)
|
|
logger = logging.getLogger("streamrip")
|
|
if verbose:
|
|
install(
|
|
console=console,
|
|
suppress=[
|
|
click,
|
|
],
|
|
show_locals=True,
|
|
locals_hide_sunder=False,
|
|
)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug("Showing all debug logs")
|
|
else:
|
|
install(console=console, suppress=[click, asyncio], max_frames=1)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
if not os.path.isfile(config_path):
|
|
console.print(
|
|
f"No file found at [bold cyan]{config_path}[/bold cyan], creating default config.",
|
|
)
|
|
set_user_defaults(config_path)
|
|
|
|
# pass to subcommands
|
|
ctx.ensure_object(dict)
|
|
ctx.obj["config_path"] = config_path
|
|
|
|
try:
|
|
c = Config(config_path)
|
|
except Exception as e:
|
|
console.print(
|
|
f"Error loading config from [bold cyan]{config_path}[/bold cyan]: {e}\n"
|
|
"Try running [bold]rip config reset[/bold]",
|
|
)
|
|
ctx.obj["config"] = None
|
|
return
|
|
|
|
# set session config values to command line args
|
|
c.session.database.downloads_enabled = not no_db
|
|
if folder is not None:
|
|
c.session.downloads.folder = folder
|
|
|
|
if quality is not None:
|
|
c.session.qobuz.quality = quality
|
|
c.session.tidal.quality = quality
|
|
c.session.deezer.quality = quality
|
|
c.session.soundcloud.quality = quality
|
|
|
|
if convert is not None:
|
|
c.session.conversion.enabled = True
|
|
assert convert.upper() in ("ALAC", "FLAC", "OGG", "MP3", "AAC")
|
|
c.session.conversion.codec = convert.upper()
|
|
|
|
if no_progress:
|
|
c.session.cli.progress_bars = False
|
|
|
|
ctx.obj["config"] = c
|
|
|
|
|
|
@rip.command()
|
|
@click.argument("urls", nargs=-1, required=True)
|
|
@click.pass_context
|
|
@coro
|
|
async def url(ctx, urls):
|
|
"""Download content from URLs."""
|
|
with ctx.obj["config"] as cfg:
|
|
async with Main(cfg) as main:
|
|
await main.add_all(urls)
|
|
await main.resolve()
|
|
await main.rip()
|
|
|
|
|
|
@rip.command()
|
|
@click.argument("path", required=True)
|
|
@click.pass_context
|
|
@coro
|
|
async def file(ctx, path):
|
|
"""Download content from URLs in a file.
|
|
|
|
Example usage:
|
|
|
|
rip file urls.txt
|
|
"""
|
|
with ctx.obj["config"] as cfg:
|
|
async with Main(cfg) as main:
|
|
async with aiofiles.open(path) as f:
|
|
await main.add_all([line async for line in f])
|
|
await main.resolve()
|
|
await main.rip()
|
|
|
|
|
|
@rip.group()
|
|
def config():
|
|
"""Manage configuration files."""
|
|
|
|
|
|
@config.command("open")
|
|
@click.option("-v", "--vim", help="Open in (Neo)Vim", is_flag=True)
|
|
@click.pass_context
|
|
def config_open(ctx, vim):
|
|
"""Open the config file in a text editor."""
|
|
config_path = ctx.obj["config"].path
|
|
|
|
console.print(f"Opening file at [bold cyan]{config_path}")
|
|
if vim:
|
|
if shutil.which("nvim") is not None:
|
|
subprocess.run(["nvim", config_path])
|
|
else:
|
|
subprocess.run(["vim", config_path])
|
|
else:
|
|
click.launch(config_path)
|
|
|
|
|
|
@config.command("reset")
|
|
@click.option("-y", "--yes", help="Don't ask for confirmation.", is_flag=True)
|
|
@click.pass_context
|
|
def config_reset(ctx, yes):
|
|
"""Reset the config file."""
|
|
config_path = ctx.obj["config_path"]
|
|
if not yes:
|
|
if not Confirm.ask(
|
|
f"Are you sure you want to reset the config file at {config_path}?",
|
|
):
|
|
console.print("[green]Reset aborted")
|
|
return
|
|
|
|
set_user_defaults(config_path)
|
|
console.print(f"Reset the config file at [bold cyan]{config_path}!")
|
|
|
|
|
|
@config.command("path")
|
|
@click.pass_context
|
|
def config_path(ctx):
|
|
"""Display the path of the config file."""
|
|
config_path = ctx.obj["config_path"]
|
|
console.print(f"Config path: [bold cyan]'{config_path}'")
|
|
|
|
|
|
@rip.group()
|
|
def database():
|
|
"""View and modify the downloads and failed downloads databases."""
|
|
|
|
|
|
@database.command("browse")
|
|
@click.argument("table")
|
|
@click.pass_context
|
|
def database_browse(ctx, table):
|
|
"""Browse the contents of a table.
|
|
|
|
Available tables:
|
|
|
|
* Downloads
|
|
|
|
* Failed
|
|
"""
|
|
from rich.table import Table
|
|
|
|
cfg: Config = ctx.obj["config"]
|
|
|
|
if table.lower() == "downloads":
|
|
downloads = db.Downloads(cfg.session.database.downloads_path)
|
|
t = Table(title="Downloads database")
|
|
t.add_column("Row")
|
|
t.add_column("ID")
|
|
for i, row in enumerate(downloads.all()):
|
|
t.add_row(f"{i:02}", *row)
|
|
console.print(t)
|
|
|
|
elif table.lower() == "failed":
|
|
failed = db.Failed(cfg.session.database.failed_downloads_path)
|
|
t = Table(title="Failed downloads database")
|
|
t.add_column("Source")
|
|
t.add_column("Media Type")
|
|
t.add_column("ID")
|
|
for i, row in enumerate(failed.all()):
|
|
t.add_row(f"{i:02}", *row)
|
|
console.print(t)
|
|
|
|
else:
|
|
console.print(
|
|
f"[red]Invalid database[/red] [bold]{table}[/bold]. [red]Choose[/red] [bold]downloads "
|
|
"[red]or[/red] failed[/bold].",
|
|
)
|
|
|
|
|
|
@rip.command()
|
|
@click.option(
|
|
"-f",
|
|
"--first",
|
|
help="Automatically download the first search result without showing the menu.",
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
"-o",
|
|
"--output-file",
|
|
help="Write search results to a file instead of showing interactive menu.",
|
|
type=click.Path(writable=True),
|
|
)
|
|
@click.option(
|
|
"-n",
|
|
"--num-results",
|
|
help="Maximum number of search results to show",
|
|
default=100,
|
|
type=click.IntRange(min=1),
|
|
)
|
|
@click.argument("source", required=True)
|
|
@click.argument("media-type", required=True)
|
|
@click.argument("query", required=True)
|
|
@click.pass_context
|
|
@coro
|
|
async def search(ctx, first, output_file, num_results, source, media_type, query):
|
|
"""Search for content using a specific source.
|
|
|
|
Example:
|
|
|
|
rip search qobuz album 'rumours'
|
|
"""
|
|
if first and output_file:
|
|
console.print("Cannot choose --first and --output-file!")
|
|
return
|
|
with ctx.obj["config"] as cfg:
|
|
async with Main(cfg) as main:
|
|
if first:
|
|
await main.search_take_first(source, media_type, query)
|
|
elif output_file:
|
|
await main.search_output_file(
|
|
source, media_type, query, output_file, num_results
|
|
)
|
|
else:
|
|
await main.search_interactive(source, media_type, query)
|
|
await main.resolve()
|
|
await main.rip()
|
|
|
|
|
|
@rip.command()
|
|
@click.option("-s", "--source", help="The source to search tracks on.")
|
|
@click.option(
|
|
"-fs",
|
|
"--fallback-source",
|
|
help="The source to search tracks on if no results were found with the main source.",
|
|
)
|
|
@click.argument("url", required=True)
|
|
@click.pass_context
|
|
@coro
|
|
async def lastfm(ctx, source, fallback_source, url):
|
|
"""Download tracks from a last.fm playlist."""
|
|
config = ctx.obj["config"]
|
|
if source is not None:
|
|
config.session.lastfm.source = source
|
|
if fallback_source is not None:
|
|
config.session.lastfm.fallback_source = fallback_source
|
|
with config as cfg:
|
|
async with Main(cfg) as main:
|
|
await main.resolve_lastfm(url)
|
|
await main.rip()
|
|
|
|
|
|
@rip.command()
|
|
@click.argument("source")
|
|
@click.argument("media-type")
|
|
@click.argument("id")
|
|
@click.pass_context
|
|
@coro
|
|
async def id(ctx, source, media_type, id):
|
|
"""Download an item by ID."""
|
|
with ctx.obj["config"] as cfg:
|
|
async with Main(cfg) as main:
|
|
await main.add_by_id(source, media_type, id)
|
|
await main.resolve()
|
|
await main.rip()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
rip()
|