Merge branch 'dev'

This commit is contained in:
nathom 2021-07-06 15:15:12 -07:00
commit 7f929b1b07
25 changed files with 1572 additions and 1268 deletions

View file

@ -21,3 +21,6 @@ ignore_missing_imports = True
[mypy-tomlkit.*]
ignore_missing_imports = True
[mypy-PIL.*]
ignore_missing_imports = True

View file

@ -1,6 +1,7 @@
# streamrip
[![Downloads](https://static.pepy.tech/personalized-badge/streamrip?period=total&units=international_system&left_color=black&right_color=green&left_text=Downloads)](https://pepy.tech/project/streamrip)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/python/black)
A scriptable stream downloader for Qobuz, Tidal, Deezer and SoundCloud.

Binary file not shown.

After

Width:  |  Height:  |  Size: 223 KiB

52
poetry.lock generated
View file

@ -71,6 +71,14 @@ python-versions = ">=3.5"
[package.dependencies]
windows-curses = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "pillow"
version = "8.3.0"
description = "Python Imaging Library (Fork)"
category = "main"
optional = false
python-versions = ">=3.6"
[[package]]
name = "requests"
version = "2.25.1"
@ -120,7 +128,7 @@ telegram = ["requests"]
[[package]]
name = "urllib3"
version = "1.26.5"
version = "1.26.6"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
@ -142,7 +150,7 @@ python-versions = "*"
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "a32e56def426809c20924e370d316663d2369853defcd590ee0c78aaef237ad3"
content-hash = "fe85433e74f85333932d145aa003b13164c786000da6083cb761f0823ebe2729"
[metadata.files]
certifi = [
@ -177,6 +185,42 @@ pick = [
{file = "pick-1.0.0-py2.py3-none-any.whl", hash = "sha256:f32c8bd0fd943490c29e461a8168f4ac267247aaa6a7fc9dd327f97832842b5f"},
{file = "pick-1.0.0.tar.gz", hash = "sha256:03f13d4f5bfe74db4b969fb74c0ef110ec443978419d6c0f1f375a0d49539034"},
]
pillow = [
{file = "Pillow-8.3.0-cp36-cp36m-macosx_10_10_x86_64.whl", hash = "sha256:333313bcc53a8a7359e98d5458dfe37bfa301da2fd0e0dc41f585ae0cede9181"},
{file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bccd0d604d814e9494f3bf3f077a23835580ed1743c5175581882e7dd1f178c3"},
{file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a7beda44f177ee602aa27e0a297da1657d9572679522c8fb8b336b734653516e"},
{file = "Pillow-8.3.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:94db5ea640330de0945b41dc77fb4847b4ab6e87149126c71b36b112e8400898"},
{file = "Pillow-8.3.0-cp36-cp36m-win32.whl", hash = "sha256:856fcbc3201a6cabf0478daa0c0a1a8a175af7e5173e2084ddb91cc707a09dd1"},
{file = "Pillow-8.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:34ce3d993cb4ca840b1e31165b38cb19c64f64f822a8bc5565bde084baff3bdb"},
{file = "Pillow-8.3.0-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:778a819c2d194e08d39d67ddb15ef0d32eba17bf7d0c2773e97bd221b2613a3e"},
{file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b42ea77f4e7374a67e1f27aaa9c62627dff681f67890e5b8f0c1e21b1500d9d2"},
{file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:53f6e4b73b3899015ac4aa95d99da0f48ea18a6d7c8db672e8bead3fb9570ef5"},
{file = "Pillow-8.3.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:fb91deb5121b6dde88599bcb3db3fdad9cf33ff3d4ccc5329ee1fe9655a2f7ff"},
{file = "Pillow-8.3.0-cp37-cp37m-win32.whl", hash = "sha256:8f65d2a98f198e904dbe89ecb10862d5f0511367d823689039e17c4d011de11e"},
{file = "Pillow-8.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:25f6564df21d15bcac142b4ed92b6c02e53557539f535f31c1f3bcc985484753"},
{file = "Pillow-8.3.0-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:c2d78c8230bda5fc9f6b1d457c7f8f3432f4fe85bed86f80ba3ed73d59775a88"},
{file = "Pillow-8.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:950e873ceefbd283cbe7bc5b648b832d1dcf89eeded6726ebec42bc7d67966c0"},
{file = "Pillow-8.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1037288a22cc8ec9d2918a24ded733a1cc4342fd7f21d15d37e6bbe5fb4a7306"},
{file = "Pillow-8.3.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:063d17a02a0170c2f880fbd373b2738b089c6adcbd1f7418667bc9e97524c11b"},
{file = "Pillow-8.3.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:561339ed7c324bbcb29b5e4f4705c97df950785394b3ac181f5bf6a08088a672"},
{file = "Pillow-8.3.0-cp38-cp38-win32.whl", hash = "sha256:331f8321418682386e4f0d0e6369f732053f95abddd2af4e1b1ef74a9537ef37"},
{file = "Pillow-8.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:eccaefbd646022b5313ca4b0c5f1ae6e0d3a52ef66de64970ecf3f9b2a1be751"},
{file = "Pillow-8.3.0-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:6f7517a220aca8b822e25b08b0df9546701a606a328da5bc057e5f32a3f9b07c"},
{file = "Pillow-8.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:cc8e926d6ffa65d0dddb871b7afe117f17bc045951e66afde60eb0eba923db9e"},
{file = "Pillow-8.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:519b3b24dedc81876d893475bade1b92c4ce7c24b9b82224f0bd8daae682e039"},
{file = "Pillow-8.3.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:72858a27dd7bd1c40f91c4f85db3b9f121c8412fd66573121febb00d074d0530"},
{file = "Pillow-8.3.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:3251557c53c1ed0c345559afc02d2b0a0aa5788042e161366ed90b27bc322d3d"},
{file = "Pillow-8.3.0-cp39-cp39-win32.whl", hash = "sha256:ce90aad0a3dc0f13a9ff0ab1f43bcbea436089b83c3fadbe37c6f1733b938bf1"},
{file = "Pillow-8.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:490c9236ef4762733b6c2e1f1fcb37793cb9c57d860aa84d6994c990461882e5"},
{file = "Pillow-8.3.0-pp36-pypy36_pp73-macosx_10_10_x86_64.whl", hash = "sha256:aef0838f28328523e9e5f2c1852dd96fb85768deb0eb8f908c54dad0f44d2f6f"},
{file = "Pillow-8.3.0-pp36-pypy36_pp73-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:713b762892efa8cd5d8dac24d16ac2d2dbf981963ed1b3297e79755f03f8cbb8"},
{file = "Pillow-8.3.0-pp36-pypy36_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:cec702974f162026bf8de47f6f4b7ce9584a63c50002b38f195ee797165fea77"},
{file = "Pillow-8.3.0-pp37-pypy37_pp73-macosx_10_10_x86_64.whl", hash = "sha256:d9ef8119ce44f90d2f8ac7c58f7da480ada5151f217dc8da03681b73fc91dec3"},
{file = "Pillow-8.3.0-pp37-pypy37_pp73-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:fc25d59ecf23ea19571065306806a29c43c67f830f0e8a121303916ba257f484"},
{file = "Pillow-8.3.0-pp37-pypy37_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:28f184c0a65be098af412f78b0b6f3bbafd1614e1dc896e810d8357342a794b7"},
{file = "Pillow-8.3.0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:c3529fb98a40f89269175442c5ff4ef81d22e91b2bdcbd33833a350709b5130c"},
{file = "Pillow-8.3.0.tar.gz", hash = "sha256:803606e206f3e366eea46b1e7ab4dac74cfac770d04de9c35319814e11e47c46"},
]
requests = [
{file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"},
{file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"},
@ -194,8 +238,8 @@ tqdm = [
{file = "tqdm-4.61.1.tar.gz", hash = "sha256:24be966933e942be5f074c29755a95b315c69a91f839a29139bf26ffffe2d3fd"},
]
urllib3 = [
{file = "urllib3-1.26.5-py2.py3-none-any.whl", hash = "sha256:753a0374df26658f99d826cfe40394a686d05985786d946fbe4165b5148f5a7c"},
{file = "urllib3-1.26.5.tar.gz", hash = "sha256:a7acd0977125325f516bda9735fa7142b909a8d01e8b2e4c8108d0984e6e0098"},
{file = "urllib3-1.26.6-py2.py3-none-any.whl", hash = "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4"},
{file = "urllib3-1.26.6.tar.gz", hash = "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f"},
]
windows-curses = [
{file = "windows_curses-2.2.0-cp36-cp36m-win32.whl", hash = "sha256:1452d771ec6f9b3fef037da2b169196a9a12be4e86a6c27dd579adac70c42028"},

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "streamrip"
version = "0.6.7"
version = "0.7"
description = "A fast, all-in-one music ripper for Qobuz, Deezer, Tidal, and SoundCloud"
authors = ["nathom <nathanthomas707@gmail.com>"]
license = "GPL-3.0-only"
@ -8,6 +8,10 @@ readme = "README.md"
homepage = "https://github.com/nathom/streamrip"
repository = "https://github.com/nathom/streamrip"
include = ["streamrip/config.toml"]
packages = [
{ include = "streamrip" },
{ include = "rip" },
]
keywords = ["hi-res", "free", "music", "download", "tqdm"]
classifiers = [
"License :: OSI Approved :: GNU General Public License (GPL)",
@ -15,7 +19,7 @@ classifiers = [
]
[tool.poetry.scripts]
rip = "streamrip.cli:main"
rip = "rip.cli:main"
[tool.poetry.dependencies]
python = "^3.8"
@ -28,6 +32,7 @@ pathvalidate = "^2.4.1"
simple-term-menu = {version = "^1.2.1", platform = 'linux or darwin'}
pick = {version = "^1.0.0", platform = 'win32 or cygwin'}
windows-curses = {version = "^2.2.0", platform = 'win32 or cygwin'}
Pillow = "^8.3.0"
[tool.poetry.urls]
"Bug Reports" = "https://github.com/nathom/streamrip/issues"

View file

@ -1,6 +0,0 @@
click
pathvalidate
requests
mutagen>=1.45.1
tqdm
tomlkit

0
rip/__init__.py Normal file
View file

3
rip/__main__.py Normal file
View file

@ -0,0 +1,3 @@
from .cli import main
main()

View file

@ -1,28 +1,11 @@
"""The streamrip command line interface."""
import logging
import os
import shutil
from getpass import getpass
from hashlib import md5
import click
import requests
from . import __version__
from .clients import TidalClient
from .config import Config
from .constants import CACHE_DIR, CONFIG_DIR, CONFIG_PATH, QOBUZ_FEATURED_KEYS
from .core import MusicDL
import logging
from streamrip import __version__
logging.basicConfig(level="WARNING")
logger = logging.getLogger("streamrip")
if not os.path.isdir(CONFIG_DIR):
os.makedirs(CONFIG_DIR, exist_ok=True)
if not os.path.isdir(CACHE_DIR):
os.makedirs(CONFIG_DIR, exist_ok=True)
@click.group(invoke_without_command=True)
@click.option("-c", "--convert", metavar="CODEC", help="alac, mp3, flac, or ogg")
@ -39,10 +22,10 @@ if not os.path.isdir(CACHE_DIR):
metavar="INT",
help="0: < 320kbps, 1: 320 kbps, 2: 16 bit/44.1 kHz, 3: 24 bit/<=96 kHz, 4: 24 bit/<=192 kHz",
)
@click.option("-t", "--text", metavar="PATH")
@click.option("-nd", "--no-db", is_flag=True)
@click.option("--debug", is_flag=True)
@click.version_option(prog_name="streamrip")
@click.option("-t", "--text", metavar="PATH", help="Download urls from a text file.")
@click.option("-nd", "--no-db", is_flag=True, help="Ignore the database.")
@click.option("--debug", is_flag=True, help="Show debugging logs.")
@click.version_option(prog_name="rip", version=__version__)
@click.pass_context
def cli(ctx, **kwargs):
"""Streamrip: The all-in-one Qobuz, Tidal, SoundCloud, and Deezer music downloader.
@ -56,6 +39,20 @@ def cli(ctx, **kwargs):
$ rip config --open
"""
import os
import requests
from .config import Config
from .constants import CONFIG_DIR
from .core import MusicDL
logging.basicConfig(level="WARNING")
logger = logging.getLogger("streamrip")
if not os.path.isdir(CONFIG_DIR):
os.makedirs(CONFIG_DIR, exist_ok=True)
global config
global core
@ -63,7 +60,14 @@ def cli(ctx, **kwargs):
logger.setLevel("DEBUG")
logger.debug("Starting debug log")
if ctx.invoked_subcommand not in {None, "lastfm", "search", "disover", "config"}:
if ctx.invoked_subcommand not in {
None,
"lastfm",
"search",
"discover",
"config",
"repair",
}:
return
config = Config()
@ -225,6 +229,8 @@ def discover(ctx, **kwargs):
* universal-chanson
"""
from streamrip.constants import QOBUZ_FEATURED_KEYS
assert (
kwargs["list"] in QOBUZ_FEATURED_KEYS
), f"Invalid featured key {kwargs['list']}"
@ -284,6 +290,13 @@ def lastfm(ctx, source, url):
@click.pass_context
def config(ctx, **kwargs):
"""Manage the streamrip configuration file."""
from streamrip.clients import TidalClient
from .constants import CONFIG_PATH
from hashlib import md5
from getpass import getpass
import shutil
import os
global config
if kwargs["reset"]:
config.reset()
@ -343,9 +356,23 @@ def config(ctx, **kwargs):
@click.argument("PATH")
@click.pass_context
def convert(ctx, **kwargs):
from . import converter
"""Batch convert audio files.
This is a tool that is included with the `rip` program that assists with
converting audio files. This is essentially a wrapper over ffmpeg
that is designed to be easy to use with sensible default options.
Examples (assuming /my/music is filled with FLAC files):
$ rip convert MP3 /my/music
$ rip convert ALAC --sampling-rate 48000 /my/music
"""
from streamrip import converter
import concurrent.futures
from tqdm import tqdm
import os
codec_map = {
"FLAC": converter.FLAC,
@ -405,6 +432,23 @@ def convert(ctx, **kwargs):
click.secho(f"File {kwargs['path']} does not exist.", fg="red")
@cli.command()
@click.option(
"-n", "--num-items", help="The number of items to atttempt downloads for."
)
@click.pass_context
def repair(ctx, **kwargs):
"""Retry failed downloads.
If the failed downloads database is enabled in the config file (it is by default),
when an item is not available for download, it is logged in the database.
When this command is called, it tries to download those items again. This is useful
for times when a temporary server error may miss a few tracks in an album.
"""
core.repair(max_items=kwargs.get("num_items"))
def none_chosen():
"""Print message if nothing was chosen."""
click.secho("No items chosen, exiting.", fg="bright_red")

View file

@ -11,7 +11,7 @@ import click
import tomlkit
from .constants import CONFIG_DIR, CONFIG_PATH, DOWNLOADS_DIR
from .exceptions import InvalidSourceError
from streamrip.exceptions import InvalidSourceError
logger = logging.getLogger("streamrip")
@ -86,6 +86,9 @@ class Config:
os.makedirs(CONFIG_DIR, exist_ok=True)
shutil.copy(self.default_config_path, self._path)
self.load()
self.file["downloads"]["folder"] = DOWNLOADS_DIR
self.save()
def load(self):
"""Load infomation from the config files, making a deepcopy."""

View file

@ -56,7 +56,13 @@ download_videos = false
video_downloads_folder = ""
# This stores a list of item IDs so that repeats are not downloaded.
[database]
[database.downloads]
enabled = true
path = ""
# If a download fails, the item ID is stored here. Then, `rip repair` can be
# called to retry the downloads
[database.failed_downloads]
enabled = true
path = ""
@ -94,6 +100,10 @@ embed = true
# "original" images can be up to 30MB, and may fail embedding.
# Using "large" is recommended.
size = "large"
# Both of these options limit the size of the embedded artwork. If their values
# are larger than the actual dimensions of the image, they will be ignored.
max_width = 999999
max_height = 999999
# Save the cover image at the highest quality as a seperate jpg file
keep_hires_cover = true
@ -128,4 +138,4 @@ fallback_source = "deezer"
check_for_updates = true
# Metadata to identify this config file. Do not change.
version = "0.6.1"
version = "0.7"

28
rip/constants.py Normal file
View file

@ -0,0 +1,28 @@
import click
import re
import os
from pathlib import Path
APPNAME = "streamrip"
APP_DIR = click.get_app_dir(APPNAME)
HOME = Path.home()
LOG_DIR = CACHE_DIR = CONFIG_DIR = APP_DIR
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml")
DB_PATH = os.path.join(LOG_DIR, "downloads.db")
FAILED_DB_PATH = os.path.join(LOG_DIR, "failed_downloads.db")
DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads")
URL_REGEX = re.compile(
r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/"
r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = re.compile(r"https://soundcloud.com/[-\w:/]+")
LASTFM_URL_REGEX = re.compile(r"https://www.last.fm/user/\w+/playlists/\w+")
QOBUZ_INTERPRETER_URL_REGEX = re.compile(
r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+"
)
DEEZER_DYNAMIC_LINK_REGEX = re.compile(r"https://deezer\.page\.link/\w+")
YOUTUBE_URL_REGEX = re.compile(r"https://www\.youtube\.com/watch\?v=[-\w]+")

View file

@ -5,6 +5,7 @@ import html
import logging
import os
import re
from getpass import getpass
from hashlib import md5
from string import Formatter
@ -14,8 +15,17 @@ import click
import requests
from tqdm import tqdm
from .bases import Track, Video, YoutubeVideo
from .clients import (
from streamrip.media import (
Track,
Video,
YoutubeVideo,
Album,
Artist,
Label,
Playlist,
Tracklist,
)
from streamrip.clients import (
Client,
DeezerClient,
QobuzClient,
@ -23,31 +33,33 @@ from .clients import (
TidalClient,
)
from .config import Config
from streamrip.constants import MEDIA_TYPES
from .constants import (
URL_REGEX,
SOUNDCLOUD_URL_REGEX,
LASTFM_URL_REGEX,
QOBUZ_INTERPRETER_URL_REGEX,
YOUTUBE_URL_REGEX,
DEEZER_DYNAMIC_LINK_REGEX,
CONFIG_PATH,
DB_PATH,
DEEZER_DYNAMIC_LINK_REGEX,
LASTFM_URL_REGEX,
MEDIA_TYPES,
QOBUZ_INTERPRETER_URL_REGEX,
SOUNDCLOUD_URL_REGEX,
URL_REGEX,
YOUTUBE_URL_REGEX,
FAILED_DB_PATH,
)
from .db import MusicDB
from .exceptions import (
from . import db
from streamrip.exceptions import (
AuthenticationError,
PartialFailure,
ItemExists,
MissingCredentials,
NonStreamable,
NoResultsFound,
ParsingError,
)
from .tracklists import Album, Artist, Label, Playlist, Tracklist
from .utils import extract_deezer_dynamic_link, extract_interpreter_url
logger = logging.getLogger("streamrip")
# ---------------- Constants ------------------ #
Media = Union[
Type[Album],
Type[Playlist],
@ -65,6 +77,9 @@ MEDIA_CLASS: Dict[str, Media] = {
"video": Video,
}
DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH}
# ---------------------------------------------- #
class MusicDL(list):
"""MusicDL."""
@ -78,13 +93,6 @@ class MusicDL(list):
:param config:
:type config: Optional[Config]
"""
self.url_parse = re.compile(URL_REGEX)
self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX)
self.lastfm_url_parse = re.compile(LASTFM_URL_REGEX)
self.interpreter_url_parse = re.compile(QOBUZ_INTERPRETER_URL_REGEX)
self.youtube_url_parse = re.compile(YOUTUBE_URL_REGEX)
self.deezer_dynamic_url_parse = re.compile(DEEZER_DYNAMIC_LINK_REGEX)
self.config: Config
if config is None:
self.config = Config(CONFIG_PATH)
@ -98,18 +106,28 @@ class MusicDL(list):
"soundcloud": SoundCloudClient(),
}
self.db: Union[MusicDB, list]
db_settings = self.config.session["database"]
if db_settings["enabled"]:
path = db_settings["path"]
if path:
self.db = MusicDB(path)
else:
self.db = MusicDB(DB_PATH)
self.config.file["database"]["path"] = DB_PATH
self.config.save()
else:
self.db = []
def get_db(db_type: str) -> db.Database:
db_settings = self.config.session["database"]
db_class = db.CLASS_MAP[db_type]
database = db_class(None, dummy=True)
default_db_path = DB_PATH_MAP[db_type]
if db_settings[db_type]["enabled"]:
path = db_settings[db_type]["path"]
if path:
database = db_class(path)
else:
database = db_class(default_db_path)
assert config is not None
config.file["database"][db_type]["path"] = default_db_path
config.save()
return database
self.db = get_db("downloads")
self.failed_db = get_db("failed_downloads")
def handle_urls(self, urls):
"""Download a url.
@ -128,7 +146,7 @@ class MusicDL(list):
# youtube is handled by youtube-dl, so much of the
# processing is not necessary
youtube_urls = self.youtube_url_parse.findall(url)
youtube_urls = YOUTUBE_URL_REGEX.findall(url)
if youtube_urls != []:
self.extend(YoutubeVideo(u) for u in youtube_urls)
@ -145,7 +163,7 @@ class MusicDL(list):
raise ParsingError(message)
for source, url_type, item_id in parsed:
if item_id in self.db:
if {"id": item_id} in self.db:
logger.info(
f"ID {item_id} already downloaded, use --no-db to override."
)
@ -191,7 +209,6 @@ class MusicDL(list):
session[key] for key in ("artwork", "conversion", "filepaths")
)
return {
"database": self.db,
"parent_folder": session["downloads"]["folder"],
"folder_format": filepaths["folder_format"],
"track_format": filepaths["track_format"],
@ -210,8 +227,30 @@ class MusicDL(list):
"video_downloads_folder"
],
"add_singles_to_folder": filepaths["add_singles_to_folder"],
"max_artwork_width": int(artwork["max_width"]),
"max_artwork_height": int(artwork["max_height"]),
}
def repair(self, max_items=None):
if max_items is None:
max_items = float("inf")
if self.failed_db.is_dummy:
click.secho(
"Failed downloads database must be enabled in the config file "
"to repair!",
fg="red",
)
raise click.Abort
for counter, (source, media_type, item_id) in enumerate(self.failed_db):
if counter >= max_items:
break
self.handle_item(source, media_type, item_id)
self.download()
def download(self):
"""Download all the items in self."""
try:
@ -251,18 +290,32 @@ class MusicDL(list):
try:
item.load_meta(**arguments)
except NonStreamable:
self.failed_db.add((item.client.source, item.type, item.id))
click.secho(f"{item!s} is not available, skipping.", fg="red")
continue
item.download(**arguments)
try:
item.download(**arguments)
except NonStreamable as e:
e.print(item)
self.failed_db.add((item.client.source, item.type, item.id))
continue
except PartialFailure as e:
for failed_item in e.failed_items:
self.failed_db.add(failed_item)
continue
except ItemExists as e:
click.secho(f'"{e!s}" already exists. Skipping.', fg="yellow")
continue
if hasattr(item, "id"):
self.db.add([item.id])
if isinstance(item, Track):
item.tag()
if arguments["conversion"]["enabled"]:
item.convert(**arguments["conversion"])
if self.db != [] and hasattr(item, "id"):
self.db.add(item.id)
def get_client(self, source: str) -> Client:
"""Get a client given the source and log in.
@ -325,7 +378,7 @@ class MusicDL(list):
"""
parsed: List[Tuple[str, str, str]] = []
interpreter_urls = self.interpreter_url_parse.findall(url)
interpreter_urls = QOBUZ_INTERPRETER_URL_REGEX.findall(url)
if interpreter_urls:
click.secho(
"Extracting IDs from Qobuz interpreter urls. Use urls "
@ -336,9 +389,9 @@ class MusicDL(list):
("qobuz", "artist", extract_interpreter_url(u))
for u in interpreter_urls
)
url = self.interpreter_url_parse.sub("", url)
url = QOBUZ_INTERPRETER_URL_REGEX.sub("", url)
dynamic_urls = self.deezer_dynamic_url_parse.findall(url)
dynamic_urls = DEEZER_DYNAMIC_LINK_REGEX.findall(url)
if dynamic_urls:
click.secho(
"Extracting IDs from Deezer dynamic link. Use urls "
@ -350,8 +403,8 @@ class MusicDL(list):
("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls
)
parsed.extend(self.url_parse.findall(url)) # Qobuz, Tidal, Dezer
soundcloud_urls = self.soundcloud_url_parse.findall(url)
parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer
soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url)
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
parsed.extend(
@ -384,7 +437,7 @@ class MusicDL(list):
# For testing:
# https://www.last.fm/user/nathan3895/playlists/12058911
user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+")
lastfm_urls = self.lastfm_url_parse.findall(urls)
lastfm_urls = LASTFM_URL_REGEX.findall(urls)
try:
lastfm_source = self.config.session["lastfm"]["source"]
lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"]
@ -554,7 +607,7 @@ class MusicDL(list):
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields})
return ret
def interactive_search( # noqa
def interactive_search(
self, query: str, source: str = "qobuz", media_type: str = "album"
):
"""Show an interactive menu that contains search results.

145
rip/db.py Normal file
View file

@ -0,0 +1,145 @@
"""Wrapper over a database that stores item IDs."""
import logging
import os
import sqlite3
from typing import List
logger = logging.getLogger("streamrip")
class Database:
"""A wrapper for an sqlite database."""
structure: dict
name: str
def __init__(self, path, dummy=False):
assert self.structure != []
assert self.name
if dummy or path is None:
self.path = None
self.is_dummy = True
return
self.is_dummy = False
self.path = path
if not os.path.exists(self.path):
self.create()
def create(self):
"""Create a database."""
if self.is_dummy:
return
with sqlite3.connect(self.path) as conn:
params = ", ".join(
f"{key} {' '.join(map(str.upper, props))} NOT NULL"
for key, props in self.structure.items()
)
command = f"CREATE TABLE {self.name} ({params})"
logger.debug(f"executing {command}")
conn.execute(command)
def keys(self):
"""Get the column names of the table."""
return self.structure.keys()
def contains(self, **items) -> bool:
"""Check whether items matches an entry in the table.
:param items: a dict of column-name + expected value
:rtype: bool
"""
if self.is_dummy:
return False
allowed_keys = set(self.structure.keys())
assert all(
key in allowed_keys for key in items.keys()
), f"Invalid key. Valid keys: {allowed_keys}"
items = {k: str(v) for k, v in items.items()}
with sqlite3.connect(self.path) as conn:
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})"
logger.debug(f"executing {command}")
return bool(conn.execute(command, tuple(items.values())).fetchone()[0])
def __contains__(self, keys: dict) -> bool:
return self.contains(**keys)
def add(self, items: List[str]):
"""Add a row to the table.
:param items: Column-name + value. Values must be provided for all cols.
:type items: List[str]
"""
if self.is_dummy:
return
assert len(items) == len(self.structure)
params = ", ".join(self.structure.keys())
question_marks = ", ".join("?" for _ in items)
command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})"
logger.debug(f"executing {command}")
with sqlite3.connect(self.path) as conn:
try:
conn.execute(command, tuple(items))
except sqlite3.IntegrityError as e:
# tried to insert an item that was already there
logger.debug(e)
def remove(self, **items):
# not in use currently
if self.is_dummy:
return
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"DELETE FROM {self.name} WHERE {conditions}"
with sqlite3.connect(self.path) as conn:
logger.debug(command)
print(command)
conn.execute(command, tuple(items.values()))
def __iter__(self):
if self.is_dummy:
return ()
with sqlite3.connect(self.path) as conn:
return conn.execute(f"SELECT * FROM {self.name}")
def reset(self):
try:
os.remove(self.path)
except FileNotFoundError:
pass
class Downloads(Database):
name = "downloads"
structure = {
"id": ["text", "unique"],
}
class FailedDownloads(Database):
name = "failed_downloads"
structure = {
"source": ["text"],
"media_type": ["text"],
"id": ["text", "unique"],
}
CLASS_MAP = {db.name: db for db in (Downloads, FailedDownloads)}

46
rip/utils.py Normal file
View file

@ -0,0 +1,46 @@
from streamrip.utils import gen_threadsafe_session
from streamrip.constants import AGENT
from typing import Tuple
import re
interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'")
def extract_interpreter_url(url: str) -> str:
"""Extract artist ID from a Qobuz interpreter url.
:param url: Urls of the form "https://www.qobuz.com/us-en/interpreter/{artist}/download-streaming-albums"
:type url: str
:rtype: str
"""
session = gen_threadsafe_session({"User-Agent": AGENT})
r = session.get(url)
match = interpreter_artist_regex.search(r.text)
if match:
return match.group(1)
raise Exception(
"Unable to extract artist id from interpreter url. Use a "
"url that contains an artist id."
)
deezer_id_link_regex = re.compile(
r"https://www\.deezer\.com/[a-z]{2}/(album|artist|playlist|track)/(\d+)"
)
def extract_deezer_dynamic_link(url: str) -> Tuple[str, str]:
"""Extract a deezer url that includes an ID from a deezer.page.link url.
:param url:
:type url: str
:rtype: Tuple[str, str]
"""
session = gen_threadsafe_session({"User-Agent": AGENT})
r = session.get(url)
match = deezer_id_link_regex.search(r.text)
if match:
return match.group(1), match.group(2)
raise Exception("Unable to extract Deezer dynamic link.")

View file

@ -1,3 +1,5 @@
"""streamrip: the all in one music downloader."""
__version__ = "0.6.7"
__version__ = "0.7"
from . import clients, converter, media, constants

View file

@ -268,12 +268,14 @@ class QobuzClient(Client):
extras = {
"artist": "albums",
"playlist": "tracks",
"label": "albums", # not tested
"label": "albums",
}
if media_type in extras:
params.update({"extra": extras[media_type]})
logger.debug("request params: %s", params)
epoint = f"{media_type}/get"
response, status_code = self._api_request(epoint, params)
@ -660,15 +662,14 @@ class TidalClient(Client):
# pending
time.sleep(4)
continue
elif status == 1:
# error checking
raise Exception
elif status == 0:
# successful
break
else:
raise Exception
self._update_authorization()
def _get_device_code(self):
"""Get the device code that will be used to log in on the browser."""
data = {

View file

@ -1,25 +1,12 @@
"""Constants that are kept in one place."""
import os
from pathlib import Path
import click
import mutagen.id3 as id3
APPNAME = "streamrip"
CACHE_DIR = click.get_app_dir(APPNAME)
CONFIG_DIR = click.get_app_dir(APPNAME)
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml")
LOG_DIR = click.get_app_dir(APPNAME)
DB_PATH = os.path.join(LOG_DIR, "downloads.db")
HOME = Path.home()
DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads")
import re
AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0"
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
SOUNDCLOUD_CLIENT_ID = re.compile("a3e059563d7fd3372b49b37f00a00bcf")
QUALITY_DESC = {
@ -147,20 +134,6 @@ FOLDER_FORMAT = (
TRACK_FORMAT = "{tracknumber}. {artist} - {title}"
# ------------------ Regexes ------------------- #
URL_REGEX = (
r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/"
r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:/]+"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+"
QOBUZ_INTERPRETER_URL_REGEX = (
r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+"
)
DEEZER_DYNAMIC_LINK_REGEX = r"https://deezer\.page\.link/\w+"
YOUTUBE_URL_REGEX = r"https://www\.youtube\.com/watch\?v=[-\w]+"
TIDAL_MAX_Q = 7
TIDAL_Q_MAP = {

View file

@ -11,7 +11,7 @@ from .exceptions import ConversionError
logger = logging.getLogger("streamrip")
SAMPLING_RATES = (44100, 48000, 88200, 96000, 176400, 192000)
SAMPLING_RATES = {44100, 48000, 88200, 96000, 176400, 192000}
class Converter:

View file

@ -1,67 +0,0 @@
"""Wrapper over a database that stores item IDs."""
import logging
import os
import sqlite3
from typing import Union
logger = logging.getLogger("streamrip")
class MusicDB:
"""Simple interface for the downloaded track database."""
def __init__(self, db_path: Union[str, os.PathLike]):
"""Create a MusicDB object.
:param db_path: filepath of the database
:type db_path: Union[str, os.PathLike]
"""
self.path = db_path
if not os.path.exists(self.path):
self.create()
def create(self):
"""Create a database at `self.path`."""
with sqlite3.connect(self.path) as conn:
try:
conn.execute("CREATE TABLE downloads (id TEXT UNIQUE NOT NULL);")
logger.debug("Download-IDs database created: %s", self.path)
except sqlite3.OperationalError:
pass
return self.path
def __contains__(self, item_id: Union[str, int]) -> bool:
"""Check whether the database contains an id.
:param item_id: the id to check
:type item_id: str
:rtype: bool
"""
logger.debug("Checking database for ID %s", item_id)
with sqlite3.connect(self.path) as conn:
return (
conn.execute(
"SELECT id FROM downloads where id=?", (item_id,)
).fetchone()
is not None
)
def add(self, item_id: str):
"""Add an id to the database.
:param item_id:
:type item_id: str
"""
logger.debug("Adding ID %s", item_id)
with sqlite3.connect(self.path) as conn:
try:
conn.execute(
"INSERT INTO downloads (id) VALUES (?)",
(item_id,),
)
conn.commit()
except sqlite3.Error as err:
if "UNIQUE" not in str(err):
raise

View file

@ -1,3 +1,7 @@
from typing import List
import click
class AuthenticationError(Exception):
pass
@ -23,7 +27,21 @@ class InvalidQuality(Exception):
class NonStreamable(Exception):
pass
def __init__(self, message=None):
self.message = message
super().__init__(self.message)
def print(self, item):
print(self.print_msg(item))
def print_msg(self, item) -> str:
base_msg = click.style(f"Unable to stream {item!s}.", fg="yellow")
if self.message:
base_msg += click.style(" Message: ", fg="yellow") + click.style(
self.message, fg="red"
)
return base_msg
class InvalidContainerError(Exception):
@ -52,3 +70,13 @@ class ConversionError(Exception):
class NoResultsFound(Exception):
pass
class ItemExists(Exception):
pass
class PartialFailure(Exception):
def __init__(self, failed_items: List):
self.failed_items = failed_items
super().__init__()

File diff suppressed because it is too large Load diff

View file

@ -1,898 +0,0 @@
"""These classes parse information from Clients into a universal, downloadable form."""
from __future__ import annotations
import logging
import os
import re
from tempfile import gettempdir
from typing import Dict, Generator, Iterable, Optional, Union
import click
from pathvalidate import sanitize_filename
from .bases import Booklet, Track, Tracklist, Video
from .clients import Client
from .constants import ALBUM_KEYS, FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT
from .db import MusicDB
from .exceptions import InvalidSourceError, NonStreamable
from .metadata import TrackMetadata
from .utils import (
clean_format,
get_cover_urls,
get_container,
get_stats_from_quality,
safe_get,
tidal_cover_url,
tqdm_download,
)
logger = logging.getLogger("streamrip")
class Album(Tracklist):
"""Represents a downloadable album.
Usage:
>>> resp = client.get('fleetwood mac rumours', 'album')
>>> album = Album.from_api(resp['items'][0], client)
>>> album.load_meta()
>>> album.download()
"""
def __init__(self, client: Client, **kwargs):
"""Create a new Album object.
:param client: a qopy client instance
:param album_id: album id returned by qobuz api
:type album_id: Union[str, int]
:param kwargs:
"""
self.client = client
self.sampling_rate = None
self.bit_depth = None
self.container: Optional[str] = None
self.disctotal: int
self.tracktotal: int
self.albumartist: str
# usually an unpacked TrackMetadata.asdict()
self.__dict__.update(kwargs)
# to improve from_api method speed
if kwargs.get("load_on_init", False):
self.load_meta()
self.loaded = False
self.downloaded = False
def load_meta(self, **kwargs):
"""Load detailed metadata from API using the id."""
assert hasattr(self, "id"), "id must be set to load metadata"
resp = self.client.get(self.id, media_type="album")
# update attributes based on response
self.meta = self._parse_get_resp(resp, self.client)
self.__dict__.update(self.meta.asdict()) # used for identification
if not self.get("streamable", False):
raise NonStreamable(f"This album is not streamable ({self.id} ID)")
self._load_tracks(resp)
self.loaded = True
@classmethod
def from_api(cls, resp: dict, client: Client):
"""Create an Album object from an API response.
:param resp:
:type resp: dict
:param client:
:type client: Client
"""
if client.source == "soundcloud":
return Playlist.from_api(resp, client)
info = cls._parse_get_resp(resp, client)
return cls(client, **info.asdict())
def _prepare_download(self, **kwargs):
"""Prepare the download of the album.
:param kwargs:
"""
# Generate the folder name
self.folder_format = kwargs.get("folder_format", FOLDER_FORMAT)
self.quality = min(kwargs.get("quality", 3), self.client.max_quality)
self.folder = self._get_formatted_folder(
kwargs.get("parent_folder", "StreamripDownloads"), self.quality
)
os.makedirs(self.folder, exist_ok=True)
self.download_message()
# choose optimal cover size and download it
click.secho("Downloading cover art", fg="magenta")
cover_path = os.path.join(gettempdir(), f"cover_{hash(self)}.jpg")
embed_cover_size = kwargs.get("embed_cover_size", "large")
assert (
embed_cover_size in self.cover_urls
), f"Invalid cover size. Must be in {self.cover_urls.keys()}"
embed_cover_url = self.cover_urls[embed_cover_size]
if embed_cover_url is not None:
tqdm_download(embed_cover_url, cover_path)
else: # sometimes happens with Deezer
cover_url = [u for u in self.cover_urls.values() if u][0]
tqdm_download(cover_url, cover_path)
hires_cov_path = os.path.join(self.folder, "cover.jpg")
if kwargs.get("keep_hires_cover", True) and not os.path.exists(hires_cov_path):
tqdm_download(self.cover_urls["original"], hires_cov_path)
cover_size = os.path.getsize(cover_path)
if cover_size > FLAC_MAX_BLOCKSIZE: # 16.77 MB
click.secho(
"Downgrading embedded cover size, too large ({cover_size}).",
fg="bright_yellow",
)
# large is about 600x600px which is guaranteed < 16.7 MB
tqdm_download(self.cover_urls["large"], cover_path)
embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover:
# container generated when formatting folder name
self.cover_obj = self.get_cover_obj(
cover_path, self.container, self.client.source
)
else:
self.cover_obj = None
# Download the booklet if applicable
if (
self.get("booklets")
and kwargs.get("download_booklets", True)
and not any(f.endswith(".pdf") for f in os.listdir(self.folder))
):
click.secho("\nDownloading booklets", fg="blue")
for item in self.booklets:
Booklet(item).download(parent_folder=self.folder)
def _download_item( # type: ignore
self,
track: Union[Track, Video],
quality: int = 3,
database: MusicDB = None,
**kwargs,
) -> bool:
"""Download an item.
:param track: The item.
:type track: Union[Track, Video]
:param quality:
:type quality: int
:param database:
:type database: MusicDB
:param kwargs:
:rtype: bool
"""
logger.debug("Downloading track to %s", self.folder)
if self.disctotal > 1 and isinstance(track, Track):
disc_folder = os.path.join(self.folder, f"Disc {track.meta.discnumber}")
kwargs["parent_folder"] = disc_folder
else:
kwargs["parent_folder"] = self.folder
if not track.download(
quality=min(self.quality, quality), database=database, **kwargs
):
return False
logger.debug("tagging tracks")
# deezer tracks come tagged
if kwargs.get("tag_tracks", True) and self.client.source != "deezer":
track.tag(
cover=self.cover_obj,
embed_cover=kwargs.get("embed_cover", True),
)
return True
@staticmethod
def _parse_get_resp(resp: dict, client: Client) -> TrackMetadata:
"""Parse information from a client.get(query, 'album') call.
:param resp:
:type resp: dict
:rtype: dict
"""
meta = TrackMetadata(album=resp, source=client.source)
meta.id = resp["id"]
return meta
def _load_tracks(self, resp):
"""Load the tracks into self from an API response.
This uses a classmethod to convert an item into a Track object, which
stores the metadata inside a TrackMetadata object.
"""
logging.debug("Loading %d tracks to album", self.tracktotal)
for track in _get_tracklist(resp, self.client.source):
if track.get("type") == "Music Video":
self.append(Video.from_album_meta(track, self.client))
else:
self.append(
Track.from_album_meta(
album=self.meta, track=track, client=self.client
)
)
def _get_formatter(self) -> dict:
"""Get a formatter that is used for naming folders and previews.
:rtype: dict
"""
fmt = {key: self.get(key) for key in ALBUM_KEYS}
stats = tuple(
min(bd, sr)
for bd, sr in zip(
(self.meta.bit_depth, self.meta.sampling_rate),
get_stats_from_quality(self.quality),
)
)
# The quality chosen is not the maximum available quality
if stats != (fmt.get("sampling_rate"), fmt.get("bit_depth")):
fmt["bit_depth"] = stats[0]
fmt["sampling_rate"] = stats[1]
if sr := fmt.get("sampling_rate"):
if sr % 1000 == 0:
# truncate the decimal .0 when converting to str
fmt["sampling_rate"] = int(sr / 1000)
else:
fmt["sampling_rate"] = sr / 1000
return fmt
def _get_formatted_folder(self, parent_folder: str, quality: int) -> str:
"""Generate the folder name for this album.
:param parent_folder:
:type parent_folder: str
:param quality:
:type quality: int
:rtype: str
"""
# necessary to format the folder
self.container = get_container(quality, self.client.source)
if self.container in ("AAC", "MP3"):
# lossy codecs don't have these metrics
self.bit_depth = self.sampling_rate = None
formatted_folder = clean_format(self.folder_format, self._get_formatter())
return os.path.join(parent_folder, formatted_folder)
@property
def title(self) -> str:
"""Get the title of the album.
:rtype: str
"""
return self.album
@title.setter
def title(self, val: str):
"""Set the title of the Album.
:param val:
:type val: str
"""
self.album = val
def __repr__(self) -> str:
"""Return a string representation of this Album object.
:rtype: str
"""
# Avoid AttributeError if load_on_init key is not set
if hasattr(self, "albumartist"):
return f"<Album: {self.albumartist} - {self.title}>"
return f"<Album: V/A - {self.title}>"
def __str__(self) -> str:
"""Return a readable string representation of this album.
:rtype: str
"""
return f"{self['albumartist']} - {self['title']}"
def __len__(self) -> int:
"""Get the length of the album.
:rtype: int
"""
return self.tracktotal
def __hash__(self):
"""Hash the album."""
return hash(self.id)
class Playlist(Tracklist):
"""Represents a downloadable playlist.
Usage:
>>> resp = client.search('hip hop', 'playlist')
>>> pl = Playlist.from_api(resp['items'][0], client)
>>> pl.load_meta()
>>> pl.download()
"""
def __init__(self, client: Client, **kwargs):
"""Create a new Playlist object.
:param client: a qopy client instance
:param album_id: playlist id returned by qobuz api
:type album_id: Union[str, int]
:param kwargs:
"""
self.name: str
self.client = client
for k, v in kwargs.items():
setattr(self, k, v)
# to improve from_api method speed
if kwargs.get("load_on_init"):
self.load_meta()
self.loaded = False
@classmethod
def from_api(cls, resp: dict, client: Client):
"""Return a Playlist object from an API response.
:param resp: a single search result entry of a playlist
:type resp: dict
:param client:
:type client: Client
"""
info = cls._parse_get_resp(resp, client)
return cls(client, **info)
def load_meta(self, **kwargs):
"""Send a request to fetch the tracklist from the api.
:param new_tracknumbers: replace the tracknumber with playlist position
:type new_tracknumbers: bool
:param kwargs:
"""
self.meta = self.client.get(self.id, media_type="playlist")
logger.debug(self.meta)
self._load_tracks(**kwargs)
self.loaded = True
def _load_tracks(self, new_tracknumbers: bool = True, **kwargs):
"""Parse the tracklist returned by the API.
:param new_tracknumbers: replace tracknumber tag with playlist position
:type new_tracknumbers: bool
"""
if self.client.source == "qobuz":
self.name = self.meta["name"]
self.image = self.meta["images"]
self.creator = safe_get(self.meta, "owner", "name", default="Qobuz")
tracklist = self.meta["tracks"]["items"]
def meta_args(track):
return {"track": track, "album": track["album"]}
elif self.client.source == "tidal":
self.name = self.meta["title"]
self.image = tidal_cover_url(self.meta["image"], 640)
self.creator = safe_get(self.meta, "creator", "name", default="TIDAL")
tracklist = self.meta["tracks"]
def meta_args(track):
return {
"track": track,
"source": self.client.source,
}
elif self.client.source == "deezer":
self.name = self.meta["title"]
self.image = self.meta["picture_big"]
self.creator = safe_get(self.meta, "creator", "name", default="Deezer")
tracklist = self.meta["tracks"]
elif self.client.source == "soundcloud":
self.name = self.meta["title"]
# self.image = self.meta.get("artwork_url").replace("large", "t500x500")
self.creator = self.meta["user"]["username"]
tracklist = self.meta["tracks"]
else:
raise NotImplementedError
self.tracktotal = len(tracklist)
if self.client.source == "soundcloud":
# No meta is included in soundcloud playlist
# response, so it is loaded at download time
for track in tracklist:
self.append(Track(self.client, id=track["id"]))
else:
for track in tracklist:
# TODO: This should be managed with .m3u files and alike. Arbitrary
# tracknumber tags might cause conflicts if the playlist files are
# inside of a library folder
meta = TrackMetadata(track=track, source=self.client.source)
cover_url = get_cover_urls(track["album"], self.client.source)[
kwargs.get("embed_cover_size", "large")
]
self.append(
Track(
self.client,
id=track.get("id"),
meta=meta,
cover_url=cover_url,
part_of_tracklist=True,
)
)
logger.debug(f"Loaded {len(self)} tracks from playlist {self.name}")
def _prepare_download(self, parent_folder: str = "StreamripDownloads", **kwargs):
fname = sanitize_filename(self.name)
self.folder = os.path.join(parent_folder, fname)
# Used for safe concurrency with tracknumbers instead of an object
# level that stores an index
self.__indices = iter(range(1, len(self) + 1))
self.download_message()
def _download_item(self, item: Track, **kwargs) -> bool: # type: ignore
kwargs["parent_folder"] = self.folder
if self.client.source == "soundcloud":
item.load_meta()
click.secho(f"Downloading {item!s}", fg="blue")
if playlist_to_album := kwargs.get("set_playlist_to_album", False):
item["album"] = self.name
item["albumartist"] = self.creator
if kwargs.get("new_tracknumbers", True):
item["tracknumber"] = next(self.__indices)
item["discnumber"] = 1
self.downloaded = item.download(**kwargs)
if self.downloaded and self.client.source != "deezer":
item.tag(embed_cover=kwargs.get("embed_cover", True))
if self.downloaded and playlist_to_album and self.client.source == "deezer":
# Because Deezer tracks come pre-tagged, the `set_playlist_to_album`
# option is never set. Here, we manually do this
from mutagen.flac import FLAC
audio = FLAC(item.path)
audio["ALBUM"] = self.name
audio["ALBUMARTIST"] = self.creator
audio["TRACKNUMBER"] = f"{item['tracknumber']:02}"
audio.save()
return self.downloaded
@staticmethod
def _parse_get_resp(item: dict, client: Client) -> dict:
"""Parse information from a search result returned by a client.search call.
:param item:
:type item: dict
:param client:
:type client: Client
"""
if client.source == "qobuz":
return {
"name": item["name"],
"id": item["id"],
}
elif client.source == "tidal":
return {
"name": item["title"],
"id": item["uuid"],
}
elif client.source == "deezer":
return {
"name": item["title"],
"id": item["id"],
}
elif client.source == "soundcloud":
return {
"name": item["title"],
"id": item["permalink_url"],
"description": item["description"],
"popularity": f"{item['likes_count']} likes",
"tracktotal": len(item["tracks"]),
}
raise InvalidSourceError(client.source)
@property
def title(self) -> str:
"""Get the title.
:rtype: str
"""
return self.name
def __repr__(self) -> str:
"""Return a string representation of this Playlist object.
:rtype: str
"""
return f"<Playlist: {self.name}>"
def __str__(self) -> str:
"""Return a readable string representation of this track.
:rtype: str
"""
return f"{self.name} ({len(self)} tracks)"
class Artist(Tracklist):
"""Represents a downloadable artist.
Usage:
>>> resp = client.get('fleetwood mac', 'artist')
>>> artist = Artist.from_api(resp['items'][0], client)
>>> artist.load_meta()
>>> artist.download()
"""
def __init__(self, client: Client, **kwargs):
"""Create a new Artist object.
:param client: a qopy client instance
:param album_id: artist id returned by qobuz api
:type album_id: Union[str, int]
:param kwargs:
"""
self.client = client
for k, v in kwargs.items():
setattr(self, k, v)
# to improve from_api method speed
if kwargs.get("load_on_init"):
self.load_meta()
self.loaded = False
def load_meta(self, **kwargs):
"""Send an API call to get album info based on id."""
self.meta = self.client.get(self.id, media_type="artist")
self._load_albums()
self.loaded = True
# override
def download(self, **kwargs):
"""Download all items in self.
:param kwargs:
"""
iterator = self._prepare_download(**kwargs)
for item in iterator:
self._download_item(item, **kwargs)
def _load_albums(self):
"""Load Album objects to self.
This parses the response of client.get(query, 'artist') responses.
"""
if self.client.source == "qobuz":
self.name = self.meta["name"]
albums = self.meta["albums"]["items"]
elif self.client.source == "tidal":
self.name = self.meta["name"]
albums = self.meta["albums"]
elif self.client.source == "deezer":
self.name = self.meta["name"]
albums = self.meta["albums"]
else:
raise InvalidSourceError(self.client.source)
for album in albums:
logger.debug("Appending album: %s", album.get("title"))
self.append(Album.from_api(album, self.client))
def _prepare_download(
self,
parent_folder: str = "StreamripDownloads",
filters: tuple = (),
**kwargs,
) -> Iterable:
"""Prepare the download.
:param parent_folder:
:type parent_folder: str
:param filters:
:type filters: tuple
:param kwargs:
:rtype: Iterable
"""
folder = sanitize_filename(self.name)
self.folder = os.path.join(parent_folder, folder)
logger.debug("Artist folder: %s", folder)
logger.debug(f"Length of tracklist {len(self)}")
logger.debug(f"Filters: {filters}")
final: Iterable
if "repeats" in filters:
final = self._remove_repeats(bit_depth=max, sampling_rate=min)
filters = tuple(f for f in filters if f != "repeats")
else:
final = self
if isinstance(filters, tuple) and self.client.source == "qobuz":
filter_funcs = (getattr(self, f"_{filter_}") for filter_ in filters)
for func in filter_funcs:
final = filter(func, final)
self.download_message()
return final
def _download_item( # type: ignore
self,
item,
parent_folder: str = "StreamripDownloads",
quality: int = 3,
database: MusicDB = None,
**kwargs,
) -> bool:
"""Download an item.
:param item:
:param parent_folder:
:type parent_folder: str
:param quality:
:type quality: int
:param database:
:type database: MusicDB
:param kwargs:
:rtype: bool
"""
try:
item.load_meta()
except NonStreamable:
logger.info("Skipping album, not available to stream.")
return False
# always an Album
status = item.download(
parent_folder=self.folder,
quality=quality,
database=database,
**kwargs,
)
return status
@property
def title(self) -> str:
"""Get the artist name.
Implemented for consistency.
:rtype: str
"""
return self.name
@classmethod
def from_api(cls, item: dict, client: Client, source: str = "qobuz"):
"""Create an Artist object from the api response of Qobuz, Tidal, or Deezer.
:param resp: response dict
:type resp: dict
:param source: in ('qobuz', 'deezer', 'tidal')
:type source: str
"""
logging.debug("Loading item from API")
info = cls._parse_get_resp(item, client)
# equivalent to Artist(client=client, **info)
return cls(client=client, **info)
@staticmethod
def _parse_get_resp(item: dict, client: Client) -> dict:
"""Parse a result from a client.search call.
:param item: the item to parse
:type item: dict
:param client:
:type client: Client
"""
if client.source in ("qobuz", "deezer"):
info = {
"name": item.get("name"),
"id": item.get("id"),
}
elif client.source == "tidal":
info = {
"name": item["name"],
"id": item["id"],
}
else:
raise InvalidSourceError(client.source)
return info
# ----------- Filters --------------
TYPE_REGEXES = {
"remaster": re.compile(r"(?i)(re)?master(ed)?"),
"extra": re.compile(
r"(?i)(anniversary|deluxe|live|collector|demo|expanded|remix)"
),
}
def _remove_repeats(self, bit_depth=max, sampling_rate=max) -> Generator:
"""Remove the repeated albums from self.
May remove different versions of the same album.
:param bit_depth: either max or min functions
:param sampling_rate: either max or min functions
"""
groups: Dict[str, list] = {}
for album in self:
if (t := self.essence(album.title)) not in groups:
groups[t] = []
groups[t].append(album)
for group in groups.values():
assert bit_depth in (min, max) and sampling_rate in (min, max)
best_bd = bit_depth(a["bit_depth"] for a in group)
best_sr = sampling_rate(a["sampling_rate"] for a in group)
for album in group:
if album["bit_depth"] == best_bd and album["sampling_rate"] == best_sr:
yield album
break
def _non_studio_albums(self, album: Album) -> bool:
"""Filter non-studio-albums.
:param artist: usually self
:param album: the album to check
:type album: Album
:rtype: bool
"""
return (
album["albumartist"] != "Various Artists"
and self.TYPE_REGEXES["extra"].search(album.title) is None
)
def _features(self, album: Album) -> bool:
"""Filter features.
This will download only albums where the requested
artist is the album artist.
:param artist: usually self
:param album: the album to check
:type album: Album
:rtype: bool
"""
return self["name"] == album["albumartist"]
def _extras(self, album: Album) -> bool:
"""Filter extras.
:param artist: usually self
:param album: the album to check
:type album: Album
:rtype: bool
"""
return self.TYPE_REGEXES["extra"].search(album.title) is None
def _non_remasters(self, album: Album) -> bool:
"""Filter non remasters.
:param artist: usually self
:param album: the album to check
:type album: Album
:rtype: bool
"""
return self.TYPE_REGEXES["remaster"].search(album.title) is not None
def _non_albums(self, album: Album) -> bool:
"""Filter releases that are not albums.
:param artist: usually self
:param album: the album to check
:type album: Album
:rtype: bool
"""
return len(album) > 1
# --------- Magic Methods --------
def __repr__(self) -> str:
"""Return a string representation of this Artist object.
:rtype: str
"""
return f"<Artist: {self.name}>"
def __str__(self) -> str:
"""Return a readable string representation of this Artist.
:rtype: str
"""
return self.name
def __hash__(self):
"""Hash self."""
return hash(self.id)
class Label(Artist):
"""Represents a downloadable Label."""
def load_meta(self, **kwargs):
"""Load metadata given an id."""
assert self.client.source == "qobuz", "Label source must be qobuz"
resp = self.client.get(self.id, "label")
self.name = resp["name"]
for album in resp["albums"]["items"]:
self.append(Album.from_api(album, client=self.client))
self.loaded = True
def __repr__(self):
"""Return a string representation of the Label."""
return f"<Label - {self.name}>"
def __str__(self) -> str:
"""Return the name of the Label.
:rtype: str
"""
return self.name
# ---------- misc utility functions -----------
def _get_tracklist(resp: dict, source: str) -> list:
"""Return the tracklist from an API response.
:param resp:
:type resp: dict
:param source:
:type source: str
:rtype: list
"""
if source == "qobuz":
return resp["tracks"]["items"]
if source in ("tidal", "deezer"):
return resp["tracks"]
raise NotImplementedError(source)

View file

@ -148,7 +148,7 @@ def tqdm_download(url: str, filepath: str, params: dict = None, desc: str = None
total = int(r.headers.get("content-length", 0))
logger.debug(f"File size = {total}")
if total < 1000 and not url.endswith("jpg") and not url.endswith("png"):
raise NonStreamable(url)
raise NonStreamable("Resource not found.")
try:
with open(filepath, "wb") as file, tqdm(
@ -322,49 +322,6 @@ def decho(message, fg=None):
logger.debug(message)
interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'")
def extract_interpreter_url(url: str) -> str:
"""Extract artist ID from a Qobuz interpreter url.
:param url: Urls of the form "https://www.qobuz.com/us-en/interpreter/{artist}/download-streaming-albums"
:type url: str
:rtype: str
"""
session = gen_threadsafe_session({"User-Agent": AGENT})
r = session.get(url)
match = interpreter_artist_regex.search(r.text)
if match:
return match.group(1)
raise Exception(
"Unable to extract artist id from interpreter url. Use a "
"url that contains an artist id."
)
deezer_id_link_regex = re.compile(
r"https://www\.deezer\.com/[a-z]{2}/(album|artist|playlist|track)/(\d+)"
)
def extract_deezer_dynamic_link(url: str) -> Tuple[str, str]:
"""Extract a deezer url that includes an ID from a deezer.page.link url.
:param url:
:type url: str
:rtype: Tuple[str, str]
"""
session = gen_threadsafe_session({"User-Agent": AGENT})
r = session.get(url)
match = deezer_id_link_regex.search(r.text)
if match:
return match.group(1), match.group(2)
raise Exception("Unable to extract Deezer dynamic link.")
def get_container(quality: int, source: str) -> str:
"""Get the file container given the quality.
@ -416,3 +373,26 @@ def get_cover_urls(resp: dict, source: str) -> dict:
return cover_urls
raise InvalidSourceError(source)
def downsize_image(filepath: str, width: int, height: int):
"""Downsize an image. If either the width or the height is greater
than the image's width or height, that dimension will not be changed.
:param filepath:
:type filepath: str
:param width:
:type width: int
:param height:
:type height: int
:raises: ValueError
"""
from PIL import Image
image = Image.open(filepath)
width = min(width, image.width)
height = min(height, image.height)
resized_image = image.resize((width, height))
resized_image.save(filepath)

View file

@ -44,7 +44,7 @@ def download_albums():
procs.append(subprocess.run([*rip_url, url]))
for p in procs:
print(p)
click.echo(p)
def check_album_dl_success(folder, correct):