This commit is contained in:
nathom 2021-04-05 17:53:19 -07:00
commit 07b6e402b1
11 changed files with 259 additions and 88 deletions

View file

@ -3,11 +3,14 @@ from pprint import pprint
import os
import re
import shutil
from pprint import pformat
import subprocess
import sys
from pprint import pformat, pprint
from tempfile import gettempdir
from typing import Any, Callable, Optional, Tuple, Union
import click
import requests
from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from pathvalidate import sanitize_filename, sanitize_filepath
@ -19,6 +22,7 @@ from .constants import (
EXT,
FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT,
SOUNDCLOUD_CLIENT_ID,
TRACK_FORMAT,
)
from .db import MusicDB
@ -117,17 +121,19 @@ class Track:
assert hasattr(self, "id"), "id must be set before loading metadata"
track_meta = self.client.get(self.id, media_type="track")
self.resp = self.client.get(self.id, media_type="track")
self.meta = TrackMetadata(
track=track_meta, source=self.client.source
track=self.resp, source=self.client.source
) # meta dict -> TrackMetadata object
try:
if self.client.source == "qobuz":
self.cover_url = track_meta["album"]["image"]["small"]
self.cover_url = self.resp["album"]["image"]["small"]
elif self.client.source == "tidal":
self.cover_url = tidal_cover_url(track_meta["album"]["cover"], 320)
self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320)
elif self.client.source == "deezer":
self.cover_url = track_meta["album"]["cover_medium"]
self.cover_url = self.resp["album"]["cover_medium"]
elif self.client.source == "soundcloud":
self.cover_url = (self.resp["artwork_url"] or self.resp['user'].get("avatar_url")).replace("large", "t500x500")
else:
raise InvalidSourceError(self.client.source)
except KeyError:
@ -145,7 +151,7 @@ class Track:
def download(
self,
quality: int = 7,
quality: int = 3,
parent_folder: str = "StreamripDownloads",
progress_bar: bool = True,
database: MusicDB = None,
@ -163,10 +169,8 @@ class Track:
:type progress_bar: bool
"""
# args override attributes
self.quality, self.folder = (
quality or self.quality,
parent_folder or self.folder,
)
self.quality = min(quality, self.client.max_quality)
self.folder = parent_folder or self.folder
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto")
@ -190,11 +194,17 @@ class Track:
return False
if hasattr(self, "cover_url"): # only for playlists and singles
logger.debug("Downloading cover")
self.download_cover()
dl_info = self.client.get_file_url(self.id, quality)
if self.client.source == "soundcloud":
url_id = self.resp
else:
url_id = self.id
temp_file = os.path.join(gettempdir(), f"~{self.id}_{quality}.tmp")
dl_info = self.client.get_file_url(url_id, self.quality)
temp_file = os.path.join(gettempdir(), f"~{hash(self.id)}_{quality}.tmp")
logger.debug("Temporary file path: %s", temp_file)
if self.client.source == "qobuz":
@ -213,7 +223,8 @@ class Track:
if self.client.source in ("qobuz", "tidal"):
logger.debug("Downloadable URL found: %s", dl_info.get("url"))
tqdm_download(dl_info["url"], temp_file) # downloads file
elif isinstance(dl_info, str): # Deezer
elif self.client.source == "deezer": # Deezer
logger.debug("Downloadable URL found: %s", dl_info)
try:
tqdm_download(dl_info, temp_file) # downloads file
@ -221,6 +232,34 @@ class Track:
logger.debug(f"Track is not downloadable {dl_info}")
click.secho("Track is not available for download", fg="red")
return False
elif self.client.source == "soundcloud":
if dl_info["type"] == "mp3":
temp_file += ".mp3"
# convert hls stream to mp3
subprocess.call(
[
"ffmpeg",
"-i",
dl_info['url'],
"-c",
"copy",
"-y",
temp_file,
"-loglevel",
"fatal",
]
)
elif dl_info["type"] == "original":
tqdm_download(dl_info["url"], temp_file)
# if a wav is returned, convert to flac
engine = converter.FLAC(temp_file)
temp_file = f"{temp_file}.flac"
engine.convert(custom_fn=temp_file)
self.final_path = self.final_path.replace(".mp3", ".flac")
self.quality = 2
else:
raise InvalidSourceError(self.client.source)
@ -250,18 +289,15 @@ class Track:
assert hasattr(self, "cover_url"), "must set cover_url attribute"
self.cover_path = os.path.join(self.folder, f"cover{hash(self.meta.album)}.jpg")
self.cover_path = os.path.join(self.folder, f"cover{hash(self.cover_url)}.jpg")
logger.debug(f"Downloading cover from {self.cover_url}")
click.secho(f"\nDownloading cover art for {self!s}", fg='blue')
click.secho(f"\nDownloading cover art for {self!s}", fg="blue")
if not os.path.exists(self.cover_path):
tqdm_download(self.cover_url, self.cover_path)
else:
logger.debug("Cover already exists, skipping download")
self.cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
logger.debug(f"Cover obj: {self.cover}")
def format_final_path(self) -> str:
"""Return the final filepath of the downloaded file.
@ -360,16 +396,13 @@ class Track:
self.container = "FLAC"
logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path)
elif self.quality == 1:
elif self.quality <= 1:
self.container = "MP3"
logger.debug("Tagging file with %s container", self.container)
try:
audio = ID3(self.final_path)
except ID3NoHeaderError:
audio = ID3()
elif self.quality == 0: # tidal and deezer
# TODO: add compatibility with MP4 container
raise NotImplementedError("Qualities < 320kbps not implemented")
else:
raise InvalidQuality(f'Invalid quality: "{self.quality}"')
@ -378,9 +411,9 @@ class Track:
audio[k] = v
if embed_cover and cover is None:
assert hasattr(self, "cover")
cover = self.cover
assert hasattr(self, "cover_path")
cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
if isinstance(audio, FLAC):
if embed_cover:
audio.add_picture(cover)
@ -574,7 +607,7 @@ class Tracklist(list):
:type quality: int
:rtype: Union[Picture, APIC]
"""
cover_type = {1: APIC, 2: Picture, 3: Picture, 4: Picture}
cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture}
cover = cover_type.get(quality)
if cover is Picture:
@ -738,7 +771,6 @@ class Album(Tracklist):
"tracktotal": resp.get("numberOfTracks"),
}
elif client.source == "deezer":
logger.debug(pformat(resp))
return {
"id": resp.get("id"),
"title": resp.get("title"),
@ -801,7 +833,7 @@ class Album(Tracklist):
def download(
self,
quality: int = 7,
quality: int = 3,
parent_folder: Union[str, os.PathLike] = "StreamripDownloads",
database: MusicDB = None,
**kwargs,
@ -836,7 +868,7 @@ class Album(Tracklist):
logger.debug("Cover already downloaded: %s. Skipping", cover_path)
else:
click.secho("Downloading cover art", fg="magenta")
if kwargs.get("large_cover", False):
if kwargs.get("large_cover", True):
cover_url = self.cover_urls.get("large")
if self.client.source == "qobuz":
tqdm_download(cover_url.replace("600", "org"), cover_path)
@ -854,7 +886,7 @@ class Album(Tracklist):
else:
tqdm_download(self.cover_urls["small"], cover_path)
embed_cover = kwargs.get('embed_cover', True) # embed by default
embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover:
cover = self.get_cover_obj(cover_path, quality)
@ -888,17 +920,18 @@ class Album(Tracklist):
else:
fmt[key] = None
fmt["sampling_rate"] /= 1000
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
if fmt["sampling_rate"] % 1 == 0.0:
fmt["sampling_rate"] = int(fmt["sampling_rate"])
if fmt.get("sampling_rate", False):
fmt["sampling_rate"] /= 1000
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
if fmt["sampling_rate"] % 1 == 0.0:
fmt["sampling_rate"] = int(fmt["sampling_rate"])
return fmt
def _get_formatted_folder(self, parent_folder: str) -> str:
if self.bit_depth is not None and self.sampling_rate is not None:
self.container = "FLAC"
elif self.client.source in ("qobuz", "deezer"):
elif self.client.source in ("qobuz", "deezer", "soundcloud"):
self.container = "MP3"
elif self.client.source == "tidal":
self.container = "AAC"
@ -937,7 +970,7 @@ class Playlist(Tracklist):
"""Represents a downloadable playlist.
Usage:
>>> resp = client.get('hip hop', 'playlist')
>>> resp = client.search('hip hop', 'playlist')
>>> pl = Playlist.from_api(resp['items'][0], client)
>>> pl.load_meta()
>>> pl.download()
@ -980,7 +1013,7 @@ class Playlist(Tracklist):
:type new_tracknumbers: bool
:param kwargs:
"""
self.meta = self.client.get(self.id, "playlist")
self.meta = self.client.get(id=self.id, media_type="playlist")
self._load_tracks(**kwargs)
def _load_tracks(self, new_tracknumbers: bool = True):
@ -990,17 +1023,17 @@ class Playlist(Tracklist):
:type new_tracknumbers: bool
"""
if self.client.source == "qobuz":
self.name = self.meta['name']
self.name = self.meta["name"]
tracklist = self.meta["tracks"]["items"]
def gen_cover(track): # ?
def gen_cover(track):
return track["album"]["image"]["small"]
def meta_args(track):
return {"track": track, "album": track["album"]}
elif self.client.source == "tidal":
self.name = self.meta['title']
self.name = self.meta["title"]
tracklist = self.meta["tracks"]
def gen_cover(track):
@ -1014,41 +1047,49 @@ class Playlist(Tracklist):
}
elif self.client.source == "deezer":
self.name = self.meta['title']
self.name = self.meta["title"]
tracklist = self.meta["tracks"]
def gen_cover(track):
return track["album"]["cover_medium"]
def meta_args(track):
return {"track": track, "source": self.client.source}
elif self.client.source == "soundcloud":
self.name = self.meta["title"]
tracklist = self.meta["tracks"]
def gen_cover(track):
return track["artwork_url"].replace("large", "t500x500")
else:
raise NotImplementedError
for i, track in enumerate(tracklist):
# TODO: This should be managed with .m3u files and alike. Arbitrary
# tracknumber tags might cause conflicts if the playlist files are
# inside of a library folder
meta = TrackMetadata(**meta_args(track))
if new_tracknumbers:
meta["tracknumber"] = str(i + 1)
if self.client.source == "soundcloud":
# No meta is included in soundcloud playlist
# response, so it is loaded at download time
for track in tracklist:
self.append(Track(self.client, id=track["id"]))
else:
for track in tracklist:
# TODO: This should be managed with .m3u files and alike. Arbitrary
# tracknumber tags might cause conflicts if the playlist files are
# inside of a library folder
meta = TrackMetadata(track=track, source=self.client.source)
self.append(
Track(
self.client,
id=track.get("id"),
meta=meta,
cover_url=gen_cover(track),
self.append(
Track(
self.client,
id=track.get("id"),
meta=meta,
cover_url=gen_cover(track),
)
)
)
logger.debug(f"Loaded {len(self)} tracks from playlist {self.name}")
def download(
self,
parent_folder: str = "Downloads",
quality: int = 6,
parent_folder: str = "StreamripDownloads",
quality: int = 3,
filters: Callable = None,
database: MusicDB = None,
**kwargs,
@ -1067,10 +1108,19 @@ class Playlist(Tracklist):
logger.debug(f"Parent folder {folder}")
self.download_message()
for track in self:
track.download(parent_folder=folder, quality=quality, database=database)
if self.client.source != "deezer":
track.tag(embed_cover=kwargs.get('embed_cover', True))
for i, track in enumerate(self):
if self.client.source == "soundcloud":
track.load_meta()
if kwargs.get("new_tracknumbers", True):
track.meta["tracknumber"] = str(i + 1)
if (
track.download(parent_folder=folder, quality=quality, database=database)
and self.client.source != "deezer"
):
track.tag(embed_cover=kwargs.get("embed_cover", True))
@staticmethod
def _parse_get_resp(item: dict, client: ClientInterface):
@ -1082,11 +1132,10 @@ class Playlist(Tracklist):
:param client:
:type client: ClientInterface
"""
print(item.keys())
if client.source == "qobuz":
return {
"name": item["name"],
"id": item['id'],
"id": item["id"],
}
elif client.source == "tidal":
return {
@ -1179,7 +1228,7 @@ class Artist(Tracklist):
def download(
self,
parent_folder: str = "Downloads",
parent_folder: str = "StreamripDownloads",
filters: Optional[Tuple] = None,
no_repeats: bool = False,
quality: int = 6,