mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-09 14:11:55 -04:00
Added support for SoundCloud downloads
Move soundcloud album parsing to Playlist stash Soundcloud downloads working
This commit is contained in:
parent
7f413c8290
commit
9d0a735cf5
11 changed files with 259 additions and 86 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -7,3 +7,6 @@ test.py
|
|||
/urls.txt
|
||||
*.flac
|
||||
/Downloads
|
||||
*.mp3
|
||||
StreamripDownloads
|
||||
*.wav
|
||||
|
|
|
@ -206,7 +206,7 @@ def config(ctx, **kwargs):
|
|||
config.reset()
|
||||
|
||||
if kwargs["open"]:
|
||||
click.secho(f"Opening {CONFIG_PATH}", fg='green')
|
||||
click.secho(f"Opening {CONFIG_PATH}", fg="green")
|
||||
click.launch(CONFIG_PATH)
|
||||
|
||||
if kwargs["qobuz"]:
|
||||
|
|
|
@ -4,7 +4,7 @@ import json
|
|||
import logging
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from pprint import pformat # , pprint
|
||||
from pprint import pformat, pprint
|
||||
from typing import Generator, Sequence, Tuple, Union
|
||||
|
||||
import click
|
||||
|
@ -16,6 +16,7 @@ from .constants import (
|
|||
AVAILABLE_QUALITY_IDS,
|
||||
DEEZER_MAX_Q,
|
||||
QOBUZ_FEATURED_KEYS,
|
||||
SOUNDCLOUD_CLIENT_ID,
|
||||
TIDAL_MAX_Q,
|
||||
)
|
||||
from .exceptions import (
|
||||
|
@ -50,6 +51,9 @@ QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2"
|
|||
DEEZER_BASE = "https://api.deezer.com"
|
||||
DEEZER_DL = "http://dz.loaderapp.info/deezer"
|
||||
|
||||
# SoundCloud
|
||||
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
|
||||
|
||||
|
||||
# ----------- Abstract Classes -----------------
|
||||
|
||||
|
@ -101,12 +105,18 @@ class ClientInterface(ABC):
|
|||
def source(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def max_quality(self):
|
||||
pass
|
||||
|
||||
|
||||
# ------------- Clients -----------------
|
||||
|
||||
|
||||
class QobuzClient(ClientInterface):
|
||||
source = "qobuz"
|
||||
max_quality = 4
|
||||
|
||||
# ------- Public Methods -------------
|
||||
def __init__(self):
|
||||
|
@ -361,6 +371,7 @@ class QobuzClient(ClientInterface):
|
|||
|
||||
class DeezerClient(ClientInterface):
|
||||
source = "deezer"
|
||||
max_quality = 2
|
||||
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
|
@ -421,6 +432,7 @@ class DeezerClient(ClientInterface):
|
|||
|
||||
class TidalClient(ClientInterface):
|
||||
source = "tidal"
|
||||
max_quality = 3
|
||||
|
||||
def __init__(self):
|
||||
self.logged_in = False
|
||||
|
@ -639,3 +651,66 @@ class TidalClient(ClientInterface):
|
|||
def _api_post(self, url, data, auth=None):
|
||||
r = requests.post(url, data=data, auth=auth, verify=False).json()
|
||||
return r
|
||||
|
||||
|
||||
class SoundCloudClient(ClientInterface):
|
||||
source = "soundcloud"
|
||||
max_quality = 0
|
||||
logged_in = True
|
||||
|
||||
def login(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def get(self, id, media_type="track"):
|
||||
assert media_type in ("track", "playlist"), f"{media_type} not supported"
|
||||
|
||||
if "http" in str(id):
|
||||
resp, _ = self._get(f"resolve?url={id}")
|
||||
elif media_type == "track":
|
||||
resp, _ = self._get(f"{media_type}s/{id}")
|
||||
else:
|
||||
raise Exception(id)
|
||||
|
||||
return resp
|
||||
|
||||
def get_file_url(self, track: dict, quality) -> dict:
|
||||
if not track["streamable"] or track["policy"] == "BLOCK":
|
||||
raise Exception
|
||||
|
||||
if track["downloadable"] and track["has_downloads_left"]:
|
||||
r = self._get(f"tracks/{track['id']}/download", resp_obj=True)
|
||||
return {"url": r.json()["redirectUri"], "type": "original"}
|
||||
|
||||
else:
|
||||
url = None
|
||||
for tc in track["media"]["transcodings"]:
|
||||
fmt = tc["format"]
|
||||
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
|
||||
url = tc["url"]
|
||||
break
|
||||
|
||||
assert url is not None
|
||||
|
||||
resp, _ = self._get(url, no_base=True)
|
||||
return {"url": resp["url"], "type": "mp3"}
|
||||
|
||||
def search(self, query: str, media_type="album"):
|
||||
params = {"q": query}
|
||||
resp, _ = self._get(f"search/{media_type}s", params=params)
|
||||
return resp
|
||||
|
||||
def _get(self, path, params=None, no_base=False, resp_obj=False):
|
||||
if params is None:
|
||||
params = {}
|
||||
params["client_id"] = SOUNDCLOUD_CLIENT_ID
|
||||
if no_base:
|
||||
url = path
|
||||
else:
|
||||
url = f"{SOUNDCLOUD_BASE}/{path}"
|
||||
|
||||
logger.debug(f"Fetching url {url}")
|
||||
r = requests.get(url, params=params)
|
||||
if resp_obj:
|
||||
return r
|
||||
|
||||
return r.json(), r.status_code
|
||||
|
|
|
@ -54,6 +54,9 @@ class Config:
|
|||
"deezer": {
|
||||
"quality": 2,
|
||||
},
|
||||
"soundcloud": {
|
||||
"quality": 0,
|
||||
},
|
||||
"database": {"enabled": True, "path": None},
|
||||
"conversion": {
|
||||
"enabled": False,
|
||||
|
|
|
@ -19,6 +19,7 @@ AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firef
|
|||
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
|
||||
|
||||
EXT = {
|
||||
0: ".mp3",
|
||||
1: ".mp3",
|
||||
2: ".flac",
|
||||
3: ".flac",
|
||||
|
@ -134,11 +135,14 @@ FOLDER_FORMAT = (
|
|||
TRACK_FORMAT = "{tracknumber}. {artist} - {title}"
|
||||
|
||||
URL_REGEX = (
|
||||
r"https:\/\/(?:www|open|play|listen)?\.?(\w+)\.com(?:(?:\/(track|playlist|album|"
|
||||
r"https:\/\/(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:\/(track|playlist|album|"
|
||||
r"artist|label))|(?:\/[-\w]+?))+\/([-\w]+)"
|
||||
)
|
||||
SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:/]+"
|
||||
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
|
||||
|
||||
|
||||
TIDAL_MAX_Q = 7
|
||||
DEEZER_MAX_Q = 6
|
||||
AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4)
|
||||
MEDIA_TYPES = ("track", "album", "artist", "label", "playlist")
|
||||
|
|
|
@ -97,7 +97,7 @@ class Converter:
|
|||
"-i",
|
||||
self.filename,
|
||||
"-loglevel",
|
||||
"warning",
|
||||
"panic",
|
||||
"-c:a",
|
||||
self.codec_lib,
|
||||
]
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import logging
|
||||
from pprint import pprint
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
@ -9,9 +10,9 @@ from typing import Generator, Optional, Tuple, Union
|
|||
|
||||
import click
|
||||
|
||||
from .clients import DeezerClient, QobuzClient, TidalClient
|
||||
from .clients import DeezerClient, QobuzClient, SoundCloudClient, TidalClient
|
||||
from .config import Config
|
||||
from .constants import CONFIG_PATH, DB_PATH, URL_REGEX
|
||||
from .constants import (CONFIG_PATH, DB_PATH, SOUNDCLOUD_URL_REGEX, URL_REGEX, MEDIA_TYPES)
|
||||
from .db import MusicDB
|
||||
from .downloader import Album, Artist, Label, Playlist, Track
|
||||
from .exceptions import AuthenticationError, ParsingError
|
||||
|
@ -27,7 +28,6 @@ MEDIA_CLASS = {
|
|||
"track": Track,
|
||||
"label": Label,
|
||||
}
|
||||
CLIENTS = {"qobuz": QobuzClient, "tidal": TidalClient, "deezer": DeezerClient}
|
||||
Media = Union[Album, Playlist, Artist, Track]
|
||||
|
||||
|
||||
|
@ -38,6 +38,7 @@ class MusicDL(list):
|
|||
):
|
||||
|
||||
self.url_parse = re.compile(URL_REGEX)
|
||||
self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX)
|
||||
self.config = config
|
||||
if self.config is None:
|
||||
self.config = Config(CONFIG_PATH)
|
||||
|
@ -46,6 +47,7 @@ class MusicDL(list):
|
|||
"qobuz": QobuzClient(),
|
||||
"tidal": TidalClient(),
|
||||
"deezer": DeezerClient(),
|
||||
"soundcloud": SoundCloudClient(),
|
||||
}
|
||||
|
||||
if config.session["database"]["enabled"]:
|
||||
|
@ -71,9 +73,9 @@ class MusicDL(list):
|
|||
f"Enter {capitalize(source)} password (will not show on screen):",
|
||||
fg="green",
|
||||
)
|
||||
self.config.file[source]["password"] = md5(getpass(
|
||||
prompt=""
|
||||
).encode('utf-8')).hexdigest()
|
||||
self.config.file[source]["password"] = md5(
|
||||
getpass(prompt="").encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
self.config.save()
|
||||
click.secho(f'Credentials saved to config file at "{self.config._path}"')
|
||||
|
@ -81,11 +83,19 @@ class MusicDL(list):
|
|||
raise Exception
|
||||
|
||||
def assert_creds(self, source: str):
|
||||
assert source in ("qobuz", "tidal", "deezer"), f"Invalid source {source}"
|
||||
assert source in (
|
||||
"qobuz",
|
||||
"tidal",
|
||||
"deezer",
|
||||
"soundcloud",
|
||||
), f"Invalid source {source}"
|
||||
if source == "deezer":
|
||||
# no login for deezer
|
||||
return
|
||||
|
||||
if source == "soundcloud":
|
||||
return
|
||||
|
||||
if source == "qobuz" and (
|
||||
self.config.file[source]["email"] is None
|
||||
or self.config.file[source]["password"] is None
|
||||
|
@ -118,6 +128,11 @@ class MusicDL(list):
|
|||
|
||||
client = self.get_client(source)
|
||||
|
||||
if media_type not in MEDIA_TYPES:
|
||||
if 'playlist' in media_type: # for SoundCloud
|
||||
media_type = 'playlist'
|
||||
|
||||
assert media_type in MEDIA_TYPES, media_type
|
||||
item = MEDIA_CLASS[media_type](client=client, id=item_id)
|
||||
self.append(item)
|
||||
|
||||
|
@ -200,7 +215,15 @@ class MusicDL(list):
|
|||
|
||||
:raises exceptions.ParsingError
|
||||
"""
|
||||
parsed = self.url_parse.findall(url)
|
||||
parsed = self.url_parse.findall(url) # Qobuz, Tidal, Dezer
|
||||
soundcloud_urls = self.soundcloud_url_parse.findall(url)
|
||||
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
|
||||
|
||||
parsed.extend(
|
||||
("soundcloud", item["kind"], url)
|
||||
for item, url in zip(soundcloud_items, soundcloud_urls)
|
||||
)
|
||||
|
||||
logger.debug(f"Parsed urls: {parsed}")
|
||||
|
||||
if parsed != []:
|
||||
|
|
|
@ -61,5 +61,5 @@ class MusicDB:
|
|||
)
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
if 'UNIQUE' not in str(e):
|
||||
if "UNIQUE" not in str(e):
|
||||
raise
|
||||
|
|
|
@ -2,11 +2,14 @@ import logging
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pprint import pformat
|
||||
import subprocess
|
||||
import sys
|
||||
from pprint import pformat, pprint
|
||||
from tempfile import gettempdir
|
||||
from typing import Any, Callable, Optional, Tuple, Union
|
||||
|
||||
import click
|
||||
import requests
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
|
||||
from pathvalidate import sanitize_filename, sanitize_filepath
|
||||
|
@ -18,6 +21,7 @@ from .constants import (
|
|||
EXT,
|
||||
FLAC_MAX_BLOCKSIZE,
|
||||
FOLDER_FORMAT,
|
||||
SOUNDCLOUD_CLIENT_ID,
|
||||
TRACK_FORMAT,
|
||||
)
|
||||
from .db import MusicDB
|
||||
|
@ -116,17 +120,19 @@ class Track:
|
|||
|
||||
assert hasattr(self, "id"), "id must be set before loading metadata"
|
||||
|
||||
track_meta = self.client.get(self.id, media_type="track")
|
||||
self.resp = self.client.get(self.id, media_type="track")
|
||||
self.meta = TrackMetadata(
|
||||
track=track_meta, source=self.client.source
|
||||
track=self.resp, source=self.client.source
|
||||
) # meta dict -> TrackMetadata object
|
||||
try:
|
||||
if self.client.source == "qobuz":
|
||||
self.cover_url = track_meta["album"]["image"]["small"]
|
||||
self.cover_url = self.resp["album"]["image"]["small"]
|
||||
elif self.client.source == "tidal":
|
||||
self.cover_url = tidal_cover_url(track_meta["album"]["cover"], 320)
|
||||
self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320)
|
||||
elif self.client.source == "deezer":
|
||||
self.cover_url = track_meta["album"]["cover_medium"]
|
||||
self.cover_url = self.resp["album"]["cover_medium"]
|
||||
elif self.client.source == "soundcloud":
|
||||
self.cover_url = (self.resp["artwork_url"] or self.resp['user'].get("avatar_url")).replace("large", "t500x500")
|
||||
else:
|
||||
raise InvalidSourceError(self.client.source)
|
||||
except KeyError:
|
||||
|
@ -144,7 +150,7 @@ class Track:
|
|||
|
||||
def download(
|
||||
self,
|
||||
quality: int = 7,
|
||||
quality: int = 3,
|
||||
parent_folder: str = "StreamripDownloads",
|
||||
progress_bar: bool = True,
|
||||
database: MusicDB = None,
|
||||
|
@ -162,10 +168,8 @@ class Track:
|
|||
:type progress_bar: bool
|
||||
"""
|
||||
# args override attributes
|
||||
self.quality, self.folder = (
|
||||
quality or self.quality,
|
||||
parent_folder or self.folder,
|
||||
)
|
||||
self.quality = min(quality, self.client.max_quality)
|
||||
self.folder = parent_folder or self.folder
|
||||
|
||||
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
|
||||
self.folder = sanitize_filepath(self.folder, platform="auto")
|
||||
|
@ -189,11 +193,17 @@ class Track:
|
|||
return False
|
||||
|
||||
if hasattr(self, "cover_url"): # only for playlists and singles
|
||||
logger.debug("Downloading cover")
|
||||
self.download_cover()
|
||||
|
||||
dl_info = self.client.get_file_url(self.id, quality)
|
||||
if self.client.source == "soundcloud":
|
||||
url_id = self.resp
|
||||
else:
|
||||
url_id = self.id
|
||||
|
||||
temp_file = os.path.join(gettempdir(), f"~{self.id}_{quality}.tmp")
|
||||
dl_info = self.client.get_file_url(url_id, self.quality)
|
||||
|
||||
temp_file = os.path.join(gettempdir(), f"~{hash(self.id)}_{quality}.tmp")
|
||||
logger.debug("Temporary file path: %s", temp_file)
|
||||
|
||||
if self.client.source == "qobuz":
|
||||
|
@ -212,7 +222,8 @@ class Track:
|
|||
if self.client.source in ("qobuz", "tidal"):
|
||||
logger.debug("Downloadable URL found: %s", dl_info.get("url"))
|
||||
tqdm_download(dl_info["url"], temp_file) # downloads file
|
||||
elif isinstance(dl_info, str): # Deezer
|
||||
|
||||
elif self.client.source == "deezer": # Deezer
|
||||
logger.debug("Downloadable URL found: %s", dl_info)
|
||||
try:
|
||||
tqdm_download(dl_info, temp_file) # downloads file
|
||||
|
@ -220,6 +231,34 @@ class Track:
|
|||
logger.debug(f"Track is not downloadable {dl_info}")
|
||||
click.secho("Track is not available for download", fg="red")
|
||||
return False
|
||||
|
||||
elif self.client.source == "soundcloud":
|
||||
if dl_info["type"] == "mp3":
|
||||
temp_file += ".mp3"
|
||||
# convert hls stream to mp3
|
||||
subprocess.call(
|
||||
[
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
dl_info['url'],
|
||||
"-c",
|
||||
"copy",
|
||||
"-y",
|
||||
temp_file,
|
||||
"-loglevel",
|
||||
"fatal",
|
||||
]
|
||||
)
|
||||
elif dl_info["type"] == "original":
|
||||
tqdm_download(dl_info["url"], temp_file)
|
||||
|
||||
# if a wav is returned, convert to flac
|
||||
engine = converter.FLAC(temp_file)
|
||||
temp_file = f"{temp_file}.flac"
|
||||
engine.convert(custom_fn=temp_file)
|
||||
|
||||
self.final_path = self.final_path.replace(".mp3", ".flac")
|
||||
self.quality = 2
|
||||
else:
|
||||
raise InvalidSourceError(self.client.source)
|
||||
|
||||
|
@ -249,18 +288,15 @@ class Track:
|
|||
|
||||
assert hasattr(self, "cover_url"), "must set cover_url attribute"
|
||||
|
||||
self.cover_path = os.path.join(self.folder, f"cover{hash(self.meta.album)}.jpg")
|
||||
self.cover_path = os.path.join(self.folder, f"cover{hash(self.cover_url)}.jpg")
|
||||
logger.debug(f"Downloading cover from {self.cover_url}")
|
||||
click.secho(f"\nDownloading cover art for {self!s}", fg='blue')
|
||||
click.secho(f"\nDownloading cover art for {self!s}", fg="blue")
|
||||
|
||||
if not os.path.exists(self.cover_path):
|
||||
tqdm_download(self.cover_url, self.cover_path)
|
||||
else:
|
||||
logger.debug("Cover already exists, skipping download")
|
||||
|
||||
self.cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
|
||||
logger.debug(f"Cover obj: {self.cover}")
|
||||
|
||||
def format_final_path(self) -> str:
|
||||
"""Return the final filepath of the downloaded file.
|
||||
|
||||
|
@ -359,16 +395,13 @@ class Track:
|
|||
self.container = "FLAC"
|
||||
logger.debug("Tagging file with %s container", self.container)
|
||||
audio = FLAC(self.final_path)
|
||||
elif self.quality == 1:
|
||||
elif self.quality <= 1:
|
||||
self.container = "MP3"
|
||||
logger.debug("Tagging file with %s container", self.container)
|
||||
try:
|
||||
audio = ID3(self.final_path)
|
||||
except ID3NoHeaderError:
|
||||
audio = ID3()
|
||||
elif self.quality == 0: # tidal and deezer
|
||||
# TODO: add compatibility with MP4 container
|
||||
raise NotImplementedError("Qualities < 320kbps not implemented")
|
||||
else:
|
||||
raise InvalidQuality(f'Invalid quality: "{self.quality}"')
|
||||
|
||||
|
@ -377,9 +410,9 @@ class Track:
|
|||
audio[k] = v
|
||||
|
||||
if embed_cover and cover is None:
|
||||
assert hasattr(self, "cover")
|
||||
cover = self.cover
|
||||
assert hasattr(self, "cover_path")
|
||||
|
||||
cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
|
||||
if isinstance(audio, FLAC):
|
||||
if embed_cover:
|
||||
audio.add_picture(cover)
|
||||
|
@ -573,7 +606,7 @@ class Tracklist(list):
|
|||
:type quality: int
|
||||
:rtype: Union[Picture, APIC]
|
||||
"""
|
||||
cover_type = {1: APIC, 2: Picture, 3: Picture, 4: Picture}
|
||||
cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture}
|
||||
|
||||
cover = cover_type.get(quality)
|
||||
if cover is Picture:
|
||||
|
@ -731,7 +764,6 @@ class Album(Tracklist):
|
|||
"tracktotal": resp.get("numberOfTracks"),
|
||||
}
|
||||
elif client.source == "deezer":
|
||||
logger.debug(pformat(resp))
|
||||
return {
|
||||
"id": resp.get("id"),
|
||||
"title": resp.get("title"),
|
||||
|
@ -794,7 +826,7 @@ class Album(Tracklist):
|
|||
|
||||
def download(
|
||||
self,
|
||||
quality: int = 7,
|
||||
quality: int = 3,
|
||||
parent_folder: Union[str, os.PathLike] = "StreamripDownloads",
|
||||
database: MusicDB = None,
|
||||
**kwargs,
|
||||
|
@ -829,7 +861,7 @@ class Album(Tracklist):
|
|||
logger.debug("Cover already downloaded: %s. Skipping", cover_path)
|
||||
else:
|
||||
click.secho("Downloading cover art", fg="magenta")
|
||||
if kwargs.get("large_cover", False):
|
||||
if kwargs.get("large_cover", True):
|
||||
cover_url = self.cover_urls.get("large")
|
||||
if self.client.source == "qobuz":
|
||||
tqdm_download(cover_url.replace("600", "org"), cover_path)
|
||||
|
@ -847,7 +879,7 @@ class Album(Tracklist):
|
|||
else:
|
||||
tqdm_download(self.cover_urls["small"], cover_path)
|
||||
|
||||
embed_cover = kwargs.get('embed_cover', True) # embed by default
|
||||
embed_cover = kwargs.get("embed_cover", True) # embed by default
|
||||
if self.client.source != "deezer" and embed_cover:
|
||||
cover = self.get_cover_obj(cover_path, quality)
|
||||
|
||||
|
@ -881,17 +913,18 @@ class Album(Tracklist):
|
|||
else:
|
||||
fmt[key] = None
|
||||
|
||||
fmt["sampling_rate"] /= 1000
|
||||
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
|
||||
if fmt["sampling_rate"] % 1 == 0.0:
|
||||
fmt["sampling_rate"] = int(fmt["sampling_rate"])
|
||||
if fmt.get("sampling_rate", False):
|
||||
fmt["sampling_rate"] /= 1000
|
||||
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
|
||||
if fmt["sampling_rate"] % 1 == 0.0:
|
||||
fmt["sampling_rate"] = int(fmt["sampling_rate"])
|
||||
|
||||
return fmt
|
||||
|
||||
def _get_formatted_folder(self, parent_folder: str) -> str:
|
||||
if self.bit_depth is not None and self.sampling_rate is not None:
|
||||
self.container = "FLAC"
|
||||
elif self.client.source in ("qobuz", "deezer"):
|
||||
elif self.client.source in ("qobuz", "deezer", "soundcloud"):
|
||||
self.container = "MP3"
|
||||
elif self.client.source == "tidal":
|
||||
self.container = "AAC"
|
||||
|
@ -930,7 +963,7 @@ class Playlist(Tracklist):
|
|||
"""Represents a downloadable playlist.
|
||||
|
||||
Usage:
|
||||
>>> resp = client.get('hip hop', 'playlist')
|
||||
>>> resp = client.search('hip hop', 'playlist')
|
||||
>>> pl = Playlist.from_api(resp['items'][0], client)
|
||||
>>> pl.load_meta()
|
||||
>>> pl.download()
|
||||
|
@ -973,7 +1006,7 @@ class Playlist(Tracklist):
|
|||
:type new_tracknumbers: bool
|
||||
:param kwargs:
|
||||
"""
|
||||
self.meta = self.client.get(self.id, "playlist")
|
||||
self.meta = self.client.get(id=self.id, media_type="playlist")
|
||||
self._load_tracks(**kwargs)
|
||||
|
||||
def _load_tracks(self, new_tracknumbers: bool = True):
|
||||
|
@ -983,17 +1016,17 @@ class Playlist(Tracklist):
|
|||
:type new_tracknumbers: bool
|
||||
"""
|
||||
if self.client.source == "qobuz":
|
||||
self.name = self.meta['name']
|
||||
self.name = self.meta["name"]
|
||||
tracklist = self.meta["tracks"]["items"]
|
||||
|
||||
def gen_cover(track): # ?
|
||||
def gen_cover(track):
|
||||
return track["album"]["image"]["small"]
|
||||
|
||||
def meta_args(track):
|
||||
return {"track": track, "album": track["album"]}
|
||||
|
||||
elif self.client.source == "tidal":
|
||||
self.name = self.meta['title']
|
||||
self.name = self.meta["title"]
|
||||
tracklist = self.meta["tracks"]
|
||||
|
||||
def gen_cover(track):
|
||||
|
@ -1007,41 +1040,49 @@ class Playlist(Tracklist):
|
|||
}
|
||||
|
||||
elif self.client.source == "deezer":
|
||||
self.name = self.meta['title']
|
||||
self.name = self.meta["title"]
|
||||
tracklist = self.meta["tracks"]
|
||||
|
||||
def gen_cover(track):
|
||||
return track["album"]["cover_medium"]
|
||||
|
||||
def meta_args(track):
|
||||
return {"track": track, "source": self.client.source}
|
||||
elif self.client.source == "soundcloud":
|
||||
self.name = self.meta["title"]
|
||||
tracklist = self.meta["tracks"]
|
||||
|
||||
def gen_cover(track):
|
||||
return track["artwork_url"].replace("large", "t500x500")
|
||||
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
for i, track in enumerate(tracklist):
|
||||
# TODO: This should be managed with .m3u files and alike. Arbitrary
|
||||
# tracknumber tags might cause conflicts if the playlist files are
|
||||
# inside of a library folder
|
||||
meta = TrackMetadata(**meta_args(track))
|
||||
if new_tracknumbers:
|
||||
meta["tracknumber"] = str(i + 1)
|
||||
if self.client.source == "soundcloud":
|
||||
# No meta is included in soundcloud playlist
|
||||
# response, so it is loaded at download time
|
||||
for track in tracklist:
|
||||
self.append(Track(self.client, id=track["id"]))
|
||||
else:
|
||||
for track in tracklist:
|
||||
# TODO: This should be managed with .m3u files and alike. Arbitrary
|
||||
# tracknumber tags might cause conflicts if the playlist files are
|
||||
# inside of a library folder
|
||||
meta = TrackMetadata(track=track, source=self.client.source)
|
||||
|
||||
self.append(
|
||||
Track(
|
||||
self.client,
|
||||
id=track.get("id"),
|
||||
meta=meta,
|
||||
cover_url=gen_cover(track),
|
||||
self.append(
|
||||
Track(
|
||||
self.client,
|
||||
id=track.get("id"),
|
||||
meta=meta,
|
||||
cover_url=gen_cover(track),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"Loaded {len(self)} tracks from playlist {self.name}")
|
||||
|
||||
def download(
|
||||
self,
|
||||
parent_folder: str = "Downloads",
|
||||
quality: int = 6,
|
||||
parent_folder: str = "StreamripDownloads",
|
||||
quality: int = 3,
|
||||
filters: Callable = None,
|
||||
database: MusicDB = None,
|
||||
**kwargs,
|
||||
|
@ -1060,10 +1101,19 @@ class Playlist(Tracklist):
|
|||
logger.debug(f"Parent folder {folder}")
|
||||
|
||||
self.download_message()
|
||||
for track in self:
|
||||
track.download(parent_folder=folder, quality=quality, database=database)
|
||||
if self.client.source != "deezer":
|
||||
track.tag(embed_cover=kwargs.get('embed_cover', True))
|
||||
for i, track in enumerate(self):
|
||||
if self.client.source == "soundcloud":
|
||||
track.load_meta()
|
||||
|
||||
if kwargs.get("new_tracknumbers", True):
|
||||
track.meta["tracknumber"] = str(i + 1)
|
||||
|
||||
if (
|
||||
track.download(parent_folder=folder, quality=quality, database=database)
|
||||
and self.client.source != "deezer"
|
||||
):
|
||||
|
||||
track.tag(embed_cover=kwargs.get("embed_cover", True))
|
||||
|
||||
@staticmethod
|
||||
def _parse_get_resp(item: dict, client: ClientInterface):
|
||||
|
@ -1075,11 +1125,10 @@ class Playlist(Tracklist):
|
|||
:param client:
|
||||
:type client: ClientInterface
|
||||
"""
|
||||
print(item.keys())
|
||||
if client.source == "qobuz":
|
||||
return {
|
||||
"name": item["name"],
|
||||
"id": item['id'],
|
||||
"id": item["id"],
|
||||
}
|
||||
elif client.source == "tidal":
|
||||
return {
|
||||
|
@ -1172,7 +1221,7 @@ class Artist(Tracklist):
|
|||
|
||||
def download(
|
||||
self,
|
||||
parent_folder: str = "Downloads",
|
||||
parent_folder: str = "StreamripDownloads",
|
||||
filters: Optional[Tuple] = None,
|
||||
no_repeats: bool = False,
|
||||
quality: int = 6,
|
||||
|
|
|
@ -2,6 +2,7 @@ import json
|
|||
import logging
|
||||
import re
|
||||
import sys
|
||||
from pprint import pprint
|
||||
from typing import Generator, Optional, Tuple, Union
|
||||
|
||||
from .constants import (
|
||||
|
@ -113,9 +114,10 @@ class TrackMetadata:
|
|||
self.date = resp.get("release_date")
|
||||
self.albumartist = resp.get("artist", {}).get("name")
|
||||
self.label = resp.get("label")
|
||||
|
||||
elif self.__source == "soundcloud":
|
||||
raise Exception
|
||||
else:
|
||||
raise ValueError
|
||||
raise ValueError(self.__source)
|
||||
|
||||
def add_track_meta(self, track: dict):
|
||||
"""Parse the metadata from a track dict returned by the
|
||||
|
@ -150,8 +152,19 @@ class TrackMetadata:
|
|||
self.discnumber = track.get("disk_number")
|
||||
self.artist = track.get("artist", {}).get("name")
|
||||
|
||||
elif self.__source == "soundcloud":
|
||||
self.title = track["title"].strip()
|
||||
self.genre = track["genre"]
|
||||
self.artist = track["user"]["username"]
|
||||
self.albumartist = self.artist
|
||||
self.year = track["created_at"][:4]
|
||||
self.label = track["label_name"]
|
||||
self.description = track["description"]
|
||||
self.tracknumber = 0
|
||||
self.tracktotal = 0
|
||||
|
||||
else:
|
||||
raise ValueError
|
||||
raise ValueError(self.__source)
|
||||
|
||||
if track.get("album"):
|
||||
self.add_album_meta(track["album"])
|
||||
|
|
|
@ -96,7 +96,7 @@ def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
|
|||
return 4
|
||||
|
||||
|
||||
def tqdm_download(url: str, filepath: str):
|
||||
def tqdm_download(url: str, filepath: str, params: dict = None):
|
||||
"""Downloads a file with a progress bar.
|
||||
|
||||
:param url: url to direct download
|
||||
|
@ -104,11 +104,14 @@ def tqdm_download(url: str, filepath: str):
|
|||
:type url: str
|
||||
:type filepath: str
|
||||
"""
|
||||
logger.debug(f"Downloading {url} to {filepath}")
|
||||
r = requests.get(url, allow_redirects=True, stream=True)
|
||||
logger.debug(f"Downloading {url} to {filepath} with params {params}")
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
r = requests.get(url, allow_redirects=True, stream=True, params=params)
|
||||
total = int(r.headers.get("content-length", 0))
|
||||
logger.debug(f"File size = {total}")
|
||||
if total < 1000:
|
||||
if total < 1000 and not url.endswith("jpg"):
|
||||
raise NonStreamable
|
||||
|
||||
try:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue