Clean up codebase

This commit is contained in:
nathom 2021-04-13 19:33:01 -07:00
parent 3967faaa44
commit a854eadeb7
6 changed files with 148 additions and 147 deletions

View file

@ -7,8 +7,6 @@ from abc import ABC, abstractmethod
from typing import Generator, Sequence, Tuple, Union
import click
import requests
from requests.packages import urllib3
from .constants import (
AGENT,
@ -16,6 +14,7 @@ from .constants import (
DEEZER_MAX_Q,
QOBUZ_FEATURED_KEYS,
SOUNDCLOUD_CLIENT_ID,
TIDAL_CLIENT_INFO,
TIDAL_MAX_Q,
)
from .exceptions import (
@ -28,36 +27,20 @@ from .exceptions import (
from .spoofbuz import Spoofer
from .utils import gen_threadsafe_session, get_quality
urllib3.disable_warnings()
requests.adapters.DEFAULT_RETRIES = 5
QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2"
TIDAL_BASE = "https://api.tidalhifi.com/v1"
TIDAL_AUTH_URL = "https://auth.tidal.com/v1/oauth2"
TIDAL_CLIENT_INFO = {
"id": "aR7gUaTK1ihpXOEP",
"secret": "eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE=",
}
DEEZER_BASE = "https://api.deezer.com"
DEEZER_DL = "http://dz.loaderapp.info/deezer"
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
logger = logging.getLogger(__name__)
# Qobuz
QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2"
# Deezer
DEEZER_BASE = "https://api.deezer.com"
DEEZER_DL = "http://dz.loaderapp.info/deezer"
# SoundCloud
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
# ----------- Abstract Classes -----------------
class ClientInterface(ABC):
class Client(ABC):
"""Common API for clients of all platforms.
This is an Abstract Base Class. It cannot be instantiated;
@ -102,18 +85,17 @@ class ClientInterface(ABC):
@property
@abstractmethod
def source(self):
"""Source from which the Client retrieves data."""
pass
@property
@abstractmethod
def max_quality(self):
"""The maximum quality that the Client supports."""
pass
# ------------- Clients -----------------
class QobuzClient(ClientInterface):
class QobuzClient(Client):
source = "qobuz"
max_quality = 4
@ -364,7 +346,7 @@ class QobuzClient(ClientInterface):
return False
class DeezerClient(ClientInterface):
class DeezerClient(Client):
source = "deezer"
max_quality = 2
@ -421,10 +403,10 @@ class DeezerClient(ClientInterface):
quality = min(DEEZER_MAX_Q, quality)
url = f"{DEEZER_DL}/{get_quality(quality, 'deezer')}/{DEEZER_BASE}/track/{meta_id}"
logger.debug(f"Download url {url}")
return url
return {"url": url}
class TidalClient(ClientInterface):
class TidalClient(Client):
source = "tidal"
max_quality = 3
@ -670,7 +652,7 @@ class TidalClient(ClientInterface):
self.session.headers.update({"authorization": f"Bearer {self.access_token}"})
class SoundCloudClient(ClientInterface):
class SoundCloudClient(Client):
source = "soundcloud"
max_quality = 0
logged_in = True

View file

@ -158,3 +158,8 @@ MEDIA_TYPES = ("track", "album", "artist", "label", "playlist")
# used to homogenize cover size keys
COVER_SIZES = ("thumbnail", "small", "large", "original")
TIDAL_CLIENT_INFO = {
"id": "aR7gUaTK1ihpXOEP",
"secret": "eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE=",
}

View file

@ -210,11 +210,9 @@ class MusicDL(list):
click.secho(f"{item!s} is not available, skipping.", fg="red")
continue
if isinstance(item, Track):
# track.download doesn't automatically tag
item.download(**arguments, tag=True)
else:
item.download(**arguments)
if isinstance(item, Track):
item.tag()
if self.db != [] and hasattr(item, "id"):
self.db.add(item.id)
@ -368,7 +366,7 @@ class MusicDL(list):
def preview_media(self, media):
if isinstance(media, Album):
fmt = (
"{albumartist} - {title}\n"
"{albumartist} - {album}\n"
"Released on {year}\n{tracktotal} tracks\n"
"{bit_depth} bit / {sampling_rate} Hz\n"
"Version: {version}\n"
@ -398,7 +396,7 @@ class MusicDL(list):
results = tuple(self.search(source, query, media_type, limit=50))
def title(res):
return f"{res[0]+1}. {res[1].title}"
return f"{res[0]+1}. {res[1].album}"
def from_title(s):
num = []

View file

@ -8,7 +8,6 @@ import os
import re
import shutil
import subprocess
from pprint import pformat, pprint
from tempfile import gettempdir
from typing import Any, Generator, Iterable, Union
@ -20,10 +19,9 @@ from pathvalidate import sanitize_filename, sanitize_filepath
from requests.packages import urllib3
from . import converter
from .clients import ClientInterface
from .clients import Client
from .constants import (
ALBUM_KEYS,
COVER_SIZES,
FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT,
TRACK_FORMAT,
@ -38,9 +36,9 @@ from .exceptions import (
from .metadata import TrackMetadata
from .utils import (
clean_format,
decho,
decrypt_mqa_file,
ext,
get_quality_id,
safe_get,
tidal_cover_url,
tqdm_download,
@ -73,7 +71,7 @@ class Track:
>>> t.tag()
"""
def __init__(self, client: ClientInterface, **kwargs):
def __init__(self, client: Client, **kwargs):
"""Create a track object.
The only required parameter is client, but passing at an id is
@ -83,7 +81,7 @@ class Track:
:param track_id: track id returned by Qobuz API
:type track_id: Optional[Union[str, int]]
:param client: qopy client
:type client: ClientInterface
:type client: Client
:param meta: TrackMetadata object
:type meta: Optional[TrackMetadata]
:param kwargs: id, filepath_format, meta, quality, folder
@ -148,13 +146,43 @@ class Track:
raise NotImplementedError(source)
def _prepare_download(self, **kwargs):
# args override attributes
self.quality = min(kwargs["quality"], self.client.max_quality)
self.folder = kwargs["parent_folder"] or self.folder
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto")
self.format_final_path()
os.makedirs(self.folder, exist_ok=True)
if self.id in kwargs.get("database", []):
self.downloaded = True
self.tagged = True
self.path = self.final_path
decho(
f"{self['title']} already logged in database, skipping.",
fg="magenta",
)
return False # because the track was not downloaded
if os.path.isfile(self.final_path): # track already exists
self.downloaded = True
self.tagged = True
self.path = self.final_path
decho(f"Track already exists: {self.final_path}", fg="magenta")
return False
self.download_cover() # only downloads for playlists and singles
self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp")
def download(
self,
quality: int = 3,
parent_folder: str = "StreamripDownloads",
progress_bar: bool = True,
database: MusicDB = None,
tag: bool = False,
**kwargs,
) -> bool:
"""
@ -167,38 +195,9 @@ class Track:
:param progress_bar: turn on/off progress bar
:type progress_bar: bool
"""
# args override attributes
self.quality = min(quality, self.client.max_quality)
self.folder = parent_folder or self.folder
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto")
self.format_final_path()
os.makedirs(self.folder, exist_ok=True)
if isinstance(database, MusicDB) and self.id in database:
self.downloaded = True
self.tagged = True
self.path = self.final_path
click.secho(
f"{self['title']} already logged in database, skipping.",
fg="magenta",
)
return False # because the track was not downloaded
if os.path.isfile(self.final_path): # track already exists
self.downloaded = True
self.tagged = True
self.path = self.final_path
click.secho(f"Track already downloaded: {self.final_path}", fg="magenta")
if not self._prepare_download(quality, parent_folder, progress_bar, **kwargs):
return False
if hasattr(self, "cover_url"): # only for playlists and singles
logger.debug("Downloading cover")
self.download_cover()
if self.client.source == "soundcloud":
# soundcloud client needs whole dict to get file url
url_id = self.resp
@ -211,14 +210,8 @@ class Track:
click.secho(f"Unable to download track. {e}", fg="red")
return False
self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp")
logger.debug("Temporary file path: %s", self.path)
if self.client.source == "qobuz":
if not (dl_info.get("sampling_rate") and dl_info.get("url")) or dl_info.get(
"sample"
):
logger.debug("Track is not downloadable: %s", dl_info)
if not self.__validate_qobuz_dl_info(dl_info):
click.secho("Track is not available for download", fg="red")
return False
@ -226,21 +219,16 @@ class Track:
self.bit_depth = dl_info.get("bit_depth")
# --------- Download Track ----------
if self.client.source in ("qobuz", "tidal"):
if self.client.source in ("qobuz", "tidal", "deezer"):
logger.debug("Downloadable URL found: %s", dl_info.get("url"))
try:
tqdm_download(
dl_info["url"], self.path, desc=self._progress_desc
) # downloads file
elif self.client.source == "deezer": # Deezer
logger.debug(
"Downloadable URL found: %s", dl_info, desc=self._progress_desc
)
try:
tqdm_download(dl_info, self.path) # downloads file
except NonStreamable:
logger.debug("Track is not downloadable %s", dl_info)
click.secho("Track is not available for download", fg="red")
click.secho(
"Track {self!s} is not available for download, skipping.", fg="red"
)
return False
elif self.client.source == "soundcloud":
@ -261,22 +249,26 @@ class Track:
if not kwargs.get("stay_temp", False):
self.move(self.final_path)
if isinstance(database, MusicDB):
try:
database.add(self.id)
logger.debug(f"{self.id} added to database")
except AttributeError:
pass
logger.debug("Downloaded: %s -> %s", self.path, self.final_path)
self.downloaded = True
if tag:
self.tag()
if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"):
os.remove(self.cover_path)
return True
def __validate_qobuz_dl_info(info: dict) -> bool:
return not all(
(info.get("sampling_rate"), info.get("bit_depth"), not info.get("sample"))
)
def move(self, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.move(self.path, path)
@ -317,7 +309,8 @@ class Track:
def download_cover(self):
"""Downloads the cover art, if cover_url is given."""
assert hasattr(self, "cover_url"), "must set cover_url attribute"
if not hasattr(self, "cover_url"):
return False
self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg")
logger.debug(f"Downloading cover from {self.cover_url}")
@ -338,7 +331,7 @@ class Track:
the TrackMetadata object.
"""
formatter = self.meta.get_formatter()
logger.debug("Track meta formatter %s", pformat(formatter))
logger.debug("Track meta formatter %s", formatter)
filename = clean_format(self.file_format, formatter)
self.final_path = os.path.join(self.folder, filename)[:250].strip() + ext(
self.quality, self.client.source
@ -349,14 +342,14 @@ class Track:
return self.final_path
@classmethod
def from_album_meta(cls, album: dict, pos: int, client: ClientInterface):
def from_album_meta(cls, album: dict, pos: int, client: Client):
"""Return a new Track object initialized with info from the album dicts
returned by client.get calls.
:param album: album metadata returned by API
:param pos: index of the track
:param client: qopy client object
:type client: ClientInterface
:type client: Client
:raises IndexError
"""
@ -367,7 +360,7 @@ class Track:
return cls(client=client, meta=meta, id=track["id"])
@classmethod
def from_api(cls, item: dict, client: ClientInterface):
def from_api(cls, item: dict, client: Client):
meta = TrackMetadata(track=item, source=client.source)
try:
if client.source == "qobuz":
@ -673,7 +666,7 @@ class Tracklist(list):
track.convert(codec, **kwargs)
@classmethod
def from_api(cls, item: dict, client: ClientInterface):
def from_api(cls, item: dict, client: Client):
"""Create an Album object from the api response of Qobuz, Tidal,
or Deezer.
@ -782,7 +775,7 @@ class Album(Tracklist):
>>> album.download()
"""
def __init__(self, client: ClientInterface, **kwargs):
def __init__(self, client: Client, **kwargs):
"""Create a new Album object.
:param client: a qopy client instance
@ -796,8 +789,8 @@ class Album(Tracklist):
self.bit_depth = None
self.container = None
for k, v in kwargs.items():
setattr(self, k, v)
# usually an unpacked TrackMetadata obj
self.__dict__.update(kwargs)
# to improve from_api method speed
if kwargs.get("load_on_init", False):
@ -823,7 +816,7 @@ class Album(Tracklist):
self.loaded = True
@classmethod
def from_api(cls, resp: dict, client: ClientInterface):
def from_api(cls, resp: dict, client: Client):
if client.source == "soundcloud":
return Playlist.from_api(resp, client)
@ -901,14 +894,16 @@ class Album(Tracklist):
return True
@staticmethod
def _parse_get_resp(resp: dict, client: ClientInterface) -> dict:
def _parse_get_resp(resp: dict, client: Client) -> dict:
"""Parse information from a client.get(query, 'album') call.
:param resp:
:type resp: dict
:rtype: dict
"""
return TrackMetadata(album=resp, source=client.source).asdict()
meta = TrackMetadata(album=resp, source=client.source).asdict()
meta["id"] = resp["id"]
return meta
def _load_tracks(self):
"""Given an album metadata dict returned by the API, append all of its
@ -954,29 +949,11 @@ class Album(Tracklist):
@property
def title(self) -> str:
"""Return the title of the album.
It is formatted so that "version" keys are included.
:rtype: str
"""
album_title = self._title
if hasattr(self, "version") and isinstance(self.version, str):
if self.version.lower() not in album_title.lower():
album_title = f"{album_title} ({self.version})"
if self.get("explicit", False):
album_title = f"{album_title} (Explicit)"
return album_title
return self.album
@title.setter
def title(self, val):
"""Sets the internal _title attribute to the given value.
:param val: title to set
"""
self._title = val
def title(self, val: str):
self.album = val
def __repr__(self) -> str:
"""Return a string representation of this Album object.
@ -1014,7 +991,7 @@ class Playlist(Tracklist):
>>> pl.download()
"""
def __init__(self, client: ClientInterface, **kwargs):
def __init__(self, client: Client, **kwargs):
"""Create a new Playlist object.
:param client: a qopy client instance
@ -1034,14 +1011,14 @@ class Playlist(Tracklist):
self.loaded = False
@classmethod
def from_api(cls, resp: dict, client: ClientInterface):
def from_api(cls, resp: dict, client: Client):
"""Return a Playlist object initialized with information from
a search result returned by the API.
:param resp: a single search result entry of a playlist
:type resp: dict
:param client:
:type client: ClientInterface
:type client: Client
"""
info = cls._parse_get_resp(resp, client)
return cls(client, **info)
@ -1054,7 +1031,7 @@ class Playlist(Tracklist):
:param kwargs:
"""
self.meta = self.client.get(self.id, media_type="playlist")
logger.debug(pformat(self.meta))
logger.debug(self.meta)
self._load_tracks(**kwargs)
self.loaded = True
@ -1170,14 +1147,14 @@ class Playlist(Tracklist):
return self.downloaded
@staticmethod
def _parse_get_resp(item: dict, client: ClientInterface) -> dict:
def _parse_get_resp(item: dict, client: Client) -> dict:
"""Parses information from a search result returned
by a client.search call.
:param item:
:type item: dict
:param client:
:type client: ClientInterface
:type client: Client
"""
if client.source == "qobuz":
return {
@ -1235,7 +1212,7 @@ class Artist(Tracklist):
>>> artist.download()
"""
def __init__(self, client: ClientInterface, **kwargs):
def __init__(self, client: Client, **kwargs):
"""Create a new Artist object.
:param client: a qopy client instance
@ -1341,7 +1318,7 @@ class Artist(Tracklist):
return self.name
@classmethod
def from_api(cls, item: dict, client: ClientInterface, source: str = "qobuz"):
def from_api(cls, item: dict, client: Client, source: str = "qobuz"):
"""Create an Artist object from the api response of Qobuz, Tidal,
or Deezer.
@ -1357,13 +1334,13 @@ class Artist(Tracklist):
return cls(client=client, **info)
@staticmethod
def _parse_get_resp(item: dict, client: ClientInterface) -> dict:
def _parse_get_resp(item: dict, client: Client) -> dict:
"""Parse a result from a client.search call.
:param item: the item to parse
:type item: dict
:param client:
:type client: ClientInterface
:type client: Client
"""
if client.source in ("qobuz", "deezer"):
info = {

View file

@ -2,7 +2,6 @@
import logging
import re
from functools import cache
from typing import Generator, Hashable, Optional, Tuple, Union
from .constants import (
@ -108,9 +107,12 @@ class TrackMetadata:
"""
if self.__source == "qobuz":
# Tags
print(resp.keys())
self.album = resp.get("title")
self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list", [])
self.genre = resp.get("genres_list") or resp.get("genre")
print("in meta:")
print(self.genre)
self.date = resp.get("release_date_original") or resp.get("release_date")
self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name")
@ -251,6 +253,23 @@ class TrackMetadata:
logger.debug("Work found: %s", work)
self.title = f"{work}: {self.title}"
@property
def album(self) -> str:
assert hasattr(self, "_album"), "Must set album before accessing"
album = self._album
if self.get("version") and self["version"] not in album:
album = f"{self._album} ({self.version})"
if self.get("work") and self["work"] not in album:
album = f"{self.work}: {album}"
return album
@album.setter
def album(self, val) -> str:
self._album = val
@property
def artist(self) -> Optional[str]:
"""Returns the value to set for the artist tag. Defaults to
@ -276,7 +295,7 @@ class TrackMetadata:
self._artist = val
@property
def genre(self) -> Union[str, None]:
def genre(self) -> Optional[str]:
"""Formats the genre list returned by the Qobuz API.
>>> meta.genre = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé']
>>> meta.genre
@ -287,6 +306,9 @@ class TrackMetadata:
if not self.get("_genres"):
return None
if isinstance(self._genres, dict):
self._genres = self._genres["name"]
if isinstance(self._genres, list):
if self.__source == "qobuz":
genres = re.findall(r"([^\u2192\/]+)", "/".join(self._genres))
@ -446,7 +468,12 @@ class TrackMetadata:
yield (v, text)
def asdict(self) -> dict:
return {k: getattr(self, k) for k in dir(self) if not k.startswith("_")}
ret = {}
for attr in dir(self):
if not attr.startswith("_") and not callable(getattr(self, attr)):
ret[attr] = getattr(self, attr)
return ret
def __setitem__(self, key, val):
"""Dict-like access for tags.

View file

@ -4,6 +4,7 @@ import os
from string import Formatter
from typing import Hashable, Optional, Union
import click
import requests
from Crypto.Cipher import AES
from Crypto.Util import Counter
@ -232,3 +233,14 @@ def gen_threadsafe_session(
session.mount("https://", adapter)
session.headers.update(headers)
return session
def decho(message, fg=None):
"""Debug echo the message.
:param message:
:param fg: ANSI color with which to display the message on the
screen
"""
click.secho(message, fg=fg)
logger.debug(message)