mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-14 07:04:51 -04:00
last.fm working
This commit is contained in:
parent
b2f75cda5c
commit
0d2ca55be5
7 changed files with 97 additions and 45 deletions
|
@ -1,7 +1,6 @@
|
|||
click
|
||||
ruamel.yaml
|
||||
packaging
|
||||
bs4
|
||||
pathvalidate
|
||||
requests
|
||||
mutagen
|
||||
|
|
|
@ -216,6 +216,33 @@ def discover(ctx, **kwargs):
|
|||
none_chosen()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"-s", "--source", help="Qobuz, Tidal, Deezer, or SoundCloud. Default: Qobuz."
|
||||
)
|
||||
@click.argument("URL")
|
||||
@click.pass_context
|
||||
def lastfm(ctx, source, url):
|
||||
"""Searches for tracks from a last.fm playlist on a given source.
|
||||
|
||||
Examples:
|
||||
|
||||
$ rip lastfm https://www.last.fm/user/nathan3895/playlists/12059037
|
||||
|
||||
Download a playlist using Qobuz as the source
|
||||
|
||||
$ rip lastfm -s tidal https://www.last.fm/user/nathan3895/playlists/12059037
|
||||
|
||||
Download a playlist using Tidal as the source
|
||||
"""
|
||||
|
||||
if source is not None:
|
||||
config.session["lastfm"]["source"] = source
|
||||
|
||||
core.handle_lastfm_urls(url)
|
||||
core.download()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("-o", "--open", is_flag=True, help="Open the config file")
|
||||
@click.option("-q", "--qobuz", is_flag=True, help="Set Qobuz credentials")
|
||||
|
|
|
@ -82,7 +82,7 @@ class Config:
|
|||
},
|
||||
"path_format": {"folder": FOLDER_FORMAT, "track": TRACK_FORMAT},
|
||||
"check_for_updates": True,
|
||||
"lastfm": {"source": "qobuz"}
|
||||
"lastfm": {"source": "qobuz"},
|
||||
}
|
||||
|
||||
def __init__(self, path: str = None):
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import logging
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from getpass import getpass
|
||||
from hashlib import md5
|
||||
from string import Formatter
|
||||
|
@ -10,21 +10,21 @@ from typing import Generator, Optional, Tuple, Union
|
|||
|
||||
import click
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from tqdm import tqdm
|
||||
|
||||
from .clients import DeezerClient, QobuzClient, SoundCloudClient, TidalClient
|
||||
from .config import Config
|
||||
from .constants import (
|
||||
CONFIG_PATH,
|
||||
DB_PATH,
|
||||
LASTFM_URL_REGEX,
|
||||
MEDIA_TYPES,
|
||||
SOUNDCLOUD_URL_REGEX,
|
||||
LASTFM_URL_REGEX,
|
||||
URL_REGEX,
|
||||
)
|
||||
from .db import MusicDB
|
||||
from .downloader import Album, Artist, Label, Playlist, Track, Tracklist
|
||||
from .exceptions import AuthenticationError, ParsingError
|
||||
from .exceptions import AuthenticationError, NoResultsFound, ParsingError
|
||||
from .utils import capitalize
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -114,18 +114,15 @@ class MusicDL(list):
|
|||
self.prompt_creds(source)
|
||||
|
||||
def handle_urls(self, url: str):
|
||||
"""Download an url
|
||||
"""Download a url
|
||||
|
||||
:param url:
|
||||
:type url: str
|
||||
:raises InvalidSourceError
|
||||
:raises ParsingError
|
||||
"""
|
||||
parsed_info = self.parse_urls(url)
|
||||
if parsed_info is None:
|
||||
return
|
||||
|
||||
for source, url_type, item_id in parsed_info:
|
||||
for source, url_type, item_id in self.parse_urls(url):
|
||||
if item_id in self.db:
|
||||
logger.info(
|
||||
f"ID {item_id} already downloaded, use --no-db to override."
|
||||
|
@ -152,7 +149,6 @@ class MusicDL(list):
|
|||
self.append(item)
|
||||
|
||||
def download(self):
|
||||
|
||||
arguments = {
|
||||
"database": self.db,
|
||||
"parent_folder": self.config.session["downloads"]["folder"],
|
||||
|
@ -192,7 +188,7 @@ class MusicDL(list):
|
|||
else:
|
||||
item.download(**arguments)
|
||||
|
||||
if self.db != [] and hasattr(item, 'id'):
|
||||
if self.db != [] and hasattr(item, "id"):
|
||||
self.db.add(item.id)
|
||||
|
||||
if self.config.session["conversion"]["enabled"]:
|
||||
|
@ -246,9 +242,6 @@ class MusicDL(list):
|
|||
parsed = self.url_parse.findall(url) # Qobuz, Tidal, Dezer
|
||||
soundcloud_urls = self.soundcloud_url_parse.findall(url)
|
||||
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
|
||||
lastfm_urls = self.lastfm_url_parse.findall(url)
|
||||
if lastfm_urls:
|
||||
self.handle_lastfm_urls(lastfm_urls)
|
||||
|
||||
parsed.extend(
|
||||
("soundcloud", item["kind"], url)
|
||||
|
@ -260,22 +253,30 @@ class MusicDL(list):
|
|||
if parsed != []:
|
||||
return parsed
|
||||
|
||||
if not lastfm_urls:
|
||||
raise ParsingError(f"Error parsing URL: `{url}`")
|
||||
raise ParsingError(f"Error parsing URL: `{url}`")
|
||||
|
||||
def handle_lastfm_urls(self, lastfm_urls):
|
||||
lastfm_source = self.config.session['lastfm']['source']
|
||||
def handle_lastfm_urls(self, urls):
|
||||
lastfm_urls = self.lastfm_url_parse.findall(urls)
|
||||
lastfm_source = self.config.session["lastfm"]["source"]
|
||||
for purl in lastfm_urls:
|
||||
click.secho(f"Fetching playlist at {purl}", fg="blue")
|
||||
title, queries = self.get_lastfm_playlist(purl)
|
||||
|
||||
pl = Playlist(client=self.clients[lastfm_source], name=title)
|
||||
for query in queries:
|
||||
click.secho(f'Searching for "{query}"', fg='cyan')
|
||||
track = next(self.search(lastfm_source, query, media_type='track'))
|
||||
tracks_not_found = 0
|
||||
for title, artist in tqdm(queries, unit="tracks", desc="Searching"):
|
||||
query = f"{title} {artist}"
|
||||
|
||||
try:
|
||||
track = next(self.search(lastfm_source, query, media_type="track"))
|
||||
except NoResultsFound:
|
||||
tracks_not_found += 1
|
||||
continue
|
||||
|
||||
pl.append(track)
|
||||
pl.loaded = True
|
||||
time.sleep(0.2) # max 5 requests/s
|
||||
|
||||
click.secho(f"{tracks_not_found} tracks not found.", fg='yellow')
|
||||
self.append(pl)
|
||||
|
||||
def handle_txt(self, filepath: Union[str, os.PathLike]):
|
||||
|
@ -312,9 +313,13 @@ class MusicDL(list):
|
|||
if i > limit:
|
||||
return
|
||||
else:
|
||||
for item in (
|
||||
items = (
|
||||
results.get("data") or results.get("items") or results.get("collection")
|
||||
):
|
||||
)
|
||||
if items is None:
|
||||
raise NoResultsFound(query)
|
||||
|
||||
for item in items:
|
||||
yield MEDIA_CLASS[media_type].from_api(item, client)
|
||||
i += 1
|
||||
if i > limit:
|
||||
|
@ -424,22 +429,34 @@ class MusicDL(list):
|
|||
return True
|
||||
|
||||
def get_lastfm_playlist(self, url: str) -> Tuple[str, list]:
|
||||
# code from qobuz-dl
|
||||
try:
|
||||
r = requests.get(url, timeout=10)
|
||||
except requests.exceptions.RequestException:
|
||||
click.secho("Unable to fetch playlist", fg="red")
|
||||
return
|
||||
info = []
|
||||
words = re.compile(r"[\w\s]+")
|
||||
title_tags = re.compile('title="([^"]+)"')
|
||||
|
||||
soup = BeautifulSoup(r.content, "html.parser")
|
||||
artists = (artist.text for artist in soup.select("td.chartlist-artist > a"))
|
||||
titles = (title.text for title in soup.select("td.chartlist-name > a"))
|
||||
def essence(s):
|
||||
s = re.sub(r"&#\d+;", "", s) # remove HTML entities
|
||||
return "".join(words.findall(s))
|
||||
|
||||
queries = [f"{artist} {title}" for artist, title in zip(artists, titles)]
|
||||
def get_titles(s):
|
||||
titles = title_tags.findall(s)[2:]
|
||||
for i in range(0, len(titles) - 1, 2):
|
||||
info.append((essence(titles[i]), essence(titles[i + 1])))
|
||||
|
||||
if not queries:
|
||||
click.secho("No tracks found", fg="red")
|
||||
return
|
||||
r = requests.get(url)
|
||||
get_titles(r.text)
|
||||
remaining_tracks = (
|
||||
int(re.search(r'data-playlisting-entry-count="(\d+)"', r.text).group(1))
|
||||
- 50
|
||||
)
|
||||
playlist_title = re.search(
|
||||
r'<h1 class="playlisting-playlist-header-title">([^<]+)</h1>', r.text
|
||||
).group(1)
|
||||
|
||||
title = soup.select_one("h1").text
|
||||
return title, queries
|
||||
page = 1
|
||||
while remaining_tracks > 0:
|
||||
page += 1
|
||||
r = requests.get(f"{url}?page={page}")
|
||||
get_titles(r.text)
|
||||
remaining_tracks -= 50
|
||||
|
||||
return playlist_title, info
|
||||
|
|
|
@ -912,7 +912,8 @@ class Album(Tracklist):
|
|||
tqdm_download(self.cover_urls[embed_cover_size], cover_path)
|
||||
if (
|
||||
self.cover_urls.get(download_cover_size, embed_cover_size)
|
||||
!= embed_cover_size or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE
|
||||
!= embed_cover_size
|
||||
or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE
|
||||
):
|
||||
# download cover at another resolution but don't use for embed
|
||||
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg")
|
||||
|
|
|
@ -44,3 +44,7 @@ class BadEncoderOption(Exception):
|
|||
|
||||
class ConversionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoResultsFound(Exception):
|
||||
pass
|
||||
|
|
|
@ -100,9 +100,13 @@ class TrackMetadata:
|
|||
self.albumartist = safe_get(resp, "artist", "name")
|
||||
self.label = resp.get("label")
|
||||
self.description = resp.get("description")
|
||||
self.disctotal = max(
|
||||
track.get("media_number", 1) for track in safe_get(resp, 'tracks', 'items', default=[{}])
|
||||
) or 1
|
||||
self.disctotal = (
|
||||
max(
|
||||
track.get("media_number", 1)
|
||||
for track in safe_get(resp, "tracks", "items", default=[{}])
|
||||
)
|
||||
or 1
|
||||
)
|
||||
self.explicit = resp.get("parental_warning", False)
|
||||
|
||||
if isinstance(self.label, dict):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue