Misc fixes; formatting

This commit is contained in:
nathom 2021-04-05 18:40:46 -07:00
parent 47723c45cf
commit 696e844c30
4 changed files with 50 additions and 47 deletions

View file

@ -4,7 +4,6 @@ import json
import logging import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pprint import pformat, pprint
from typing import Generator, Sequence, Tuple, Union from typing import Generator, Sequence, Tuple, Union
import click import click
@ -491,7 +490,7 @@ class TidalClient(ClientInterface):
} }
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params) resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
logger.debug(f"{pformat(manifest)}") logger.debug(manifest)
return { return {
"url": manifest["urls"][0], "url": manifest["urls"][0],
"enc_key": manifest.get("keyId"), "enc_key": manifest.get("keyId"),
@ -547,7 +546,6 @@ class TidalClient(ClientInterface):
if "status" in resp and resp["status"] != 200: if "status" in resp and resp["status"] != 200:
raise Exception(f"Device authorization failed {resp}") raise Exception(f"Device authorization failed {resp}")
logger.debug(pformat(resp))
self.device_code = resp["deviceCode"] self.device_code = resp["deviceCode"]
self.user_code = resp["userCode"] self.user_code = resp["userCode"]
self.user_code_expiry = resp["expiresIn"] self.user_code_expiry = resp["expiresIn"]

View file

@ -4,7 +4,6 @@ import re
import sys import sys
from getpass import getpass from getpass import getpass
from hashlib import md5 from hashlib import md5
from pprint import pprint
from string import Formatter from string import Formatter
from typing import Generator, Optional, Tuple, Union from typing import Generator, Optional, Tuple, Union
@ -316,7 +315,7 @@ class MusicDL(list):
def from_title(s): def from_title(s):
num = [] num = []
for char in s: for char in s:
if char != '.': if char != ".":
num.append(char) num.append(char)
else: else:
break break

View file

@ -3,13 +3,11 @@ import os
import re import re
import shutil import shutil
import subprocess import subprocess
import sys
from pprint import pformat, pprint from pprint import pformat, pprint
from tempfile import gettempdir from tempfile import gettempdir
from typing import Any, Callable, Optional, Tuple, Union from typing import Any, Callable, Optional, Tuple, Union
import click import click
import requests
from mutagen.flac import FLAC, Picture from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from pathvalidate import sanitize_filename, sanitize_filepath from pathvalidate import sanitize_filename, sanitize_filepath
@ -21,7 +19,6 @@ from .constants import (
EXT, EXT,
FLAC_MAX_BLOCKSIZE, FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT, FOLDER_FORMAT,
SOUNDCLOUD_CLIENT_ID,
TRACK_FORMAT, TRACK_FORMAT,
) )
from .db import MusicDB from .db import MusicDB
@ -99,8 +96,8 @@ class Track:
self.sampling_rate = 44100 self.sampling_rate = 44100
self.bit_depth = 16 self.bit_depth = 16
self._is_downloaded = False self.downloaded = False
self._is_tagged = False self.tagged = False
for attr in ("quality", "folder", "meta"): for attr in ("quality", "folder", "meta"):
setattr(self, attr, None) setattr(self, attr, None)
@ -180,8 +177,8 @@ class Track:
if database is not None: if database is not None:
if self.id in database: if self.id in database:
self._is_downloaded = True self.downloaded = True
self._is_tagged = True self.tagged = True
click.secho( click.secho(
f"{self['title']} already logged in database, skipping.", f"{self['title']} already logged in database, skipping.",
fg="magenta", fg="magenta",
@ -189,8 +186,8 @@ class Track:
return False # because the track was not downloaded return False # because the track was not downloaded
if os.path.isfile(self.format_final_path()): # track already exists if os.path.isfile(self.format_final_path()): # track already exists
self._is_downloaded = True self.downloaded = True
self._is_tagged = True self.tagged = True
click.secho(f"Track already downloaded: {self.final_path}", fg="magenta") click.secho(f"Track already downloaded: {self.final_path}", fg="magenta")
return False return False
@ -235,6 +232,33 @@ class Track:
return False return False
elif self.client.source == "soundcloud": elif self.client.source == "soundcloud":
temp_file = self._soundcloud_download(dl_info, temp_file)
else:
raise InvalidSourceError(self.client.source)
if isinstance(dl_info, dict) and dl_info.get("enc_key"):
decrypt_mqa_file(temp_file, self.final_path, dl_info["enc_key"])
else:
shutil.move(temp_file, self.final_path)
if isinstance(database, MusicDB):
database.add(self.id)
logger.debug(f"{self.id} added to database")
logger.debug("Downloaded: %s -> %s", temp_file, self.final_path)
self.downloaded = True
if tag:
self.tag()
if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"):
os.remove(self.cover_path)
return True
def _soundcloud_download(self, dl_info: dict, temp_file: str) -> str:
if dl_info["type"] == "mp3": if dl_info["type"] == "mp3":
temp_file += ".mp3" temp_file += ".mp3"
# convert hls stream to mp3 # convert hls stream to mp3
@ -261,29 +285,8 @@ class Track:
self.final_path = self.final_path.replace(".mp3", ".flac") self.final_path = self.final_path.replace(".mp3", ".flac")
self.quality = 2 self.quality = 2
else:
raise InvalidSourceError(self.client.source)
if isinstance(dl_info, dict) and dl_info.get("enc_key"): return temp_file
decrypt_mqa_file(temp_file, self.final_path, dl_info["enc_key"])
else:
shutil.move(temp_file, self.final_path)
if isinstance(database, MusicDB):
database.add(self.id)
logger.debug(f"{self.id} added to database")
logger.debug("Downloaded: %s -> %s", temp_file, self.final_path)
self._is_downloaded = True
if tag:
self.tag()
if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"):
os.remove(self.cover_path)
return True
def download_cover(self): def download_cover(self):
"""Downloads the cover art, if cover_url is given.""" """Downloads the cover art, if cover_url is given."""
@ -378,13 +381,13 @@ class Track:
:type embed_cover: bool :type embed_cover: bool
""" """
assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata" assert isinstance(self.meta, TrackMetadata), "meta must be TrackMetadata"
if not self._is_downloaded: if not self.downloaded:
logger.info( logger.info(
"Track %s not tagged because it was not downloaded", self["title"] "Track %s not tagged because it was not downloaded", self["title"]
) )
return return
if self._is_tagged: if self.tagged:
logger.info( logger.info(
"Track %s not tagged because it is already tagged", self["title"] "Track %s not tagged because it is already tagged", self["title"]
) )
@ -426,7 +429,7 @@ class Track:
else: else:
raise ValueError(f"Unknown container type: {audio}") raise ValueError(f"Unknown container type: {audio}")
self._is_tagged = True self.tagged = True
def convert(self, codec: str = "ALAC", **kwargs): def convert(self, codec: str = "ALAC", **kwargs):
"""Converts the track to another codec. """Converts the track to another codec.
@ -445,7 +448,7 @@ class Track:
:type codec: str :type codec: str
:param kwargs: :param kwargs:
""" """
if not self._is_downloaded: if not self.downloaded:
logger.debug("Track not downloaded, skipping conversion") logger.debug("Track not downloaded, skipping conversion")
click.secho("Track not downloaded, skipping conversion", fg="magenta") click.secho("Track not downloaded, skipping conversion", fg="magenta")
return return
@ -775,12 +778,17 @@ class Album(Tracklist):
"tracktotal": resp.get("numberOfTracks"), "tracktotal": resp.get("numberOfTracks"),
} }
elif client.source == "deezer": elif client.source == "deezer":
if resp.get("release_date", False):
year = resp["release_date"][:4]
else:
year = None
return { return {
"id": resp.get("id"), "id": resp.get("id"),
"title": resp.get("title"), "title": resp.get("title"),
"_artist": safe_get(resp, "artist", "name"), "_artist": safe_get(resp, "artist", "name"),
"albumartist": safe_get(resp, "artist", "name"), "albumartist": safe_get(resp, "artist", "name"),
"year": resp.get("release_date")[:4], "year": year,
# version not given by API # version not given by API
"cover_urls": { "cover_urls": {
sk: resp.get(rk) # size key, resp key sk: resp.get(rk) # size key, resp key

View file

@ -1,8 +1,6 @@
import json import json
import logging import logging
import re import re
import sys
from pprint import pprint
from typing import Generator, Optional, Tuple, Union from typing import Generator, Optional, Tuple, Union
from .constants import ( from .constants import (