mirror of
https://github.com/nathom/streamrip.git
synced 2025-05-13 22:54:55 -04:00
Misc typing
This commit is contained in:
parent
48b4da80e5
commit
f81176b3dc
8 changed files with 26 additions and 22 deletions
|
@ -1,3 +0,0 @@
|
|||
[settings]
|
||||
multi_line_output=3
|
||||
include_trailing_comma=True
|
12
rip/cli.py
12
rip/cli.py
|
@ -83,15 +83,15 @@ class DownloadCommand(Command):
|
|||
update_check = threading.Thread(target=is_outdated, daemon=True)
|
||||
update_check.start()
|
||||
|
||||
path, codec, quality, no_db, directory, config = clean_options(
|
||||
path, quality, no_db, directory, config = clean_options(
|
||||
self.option("file"),
|
||||
self.option("codec"),
|
||||
self.option("max-quality"),
|
||||
self.option("ignore-db"),
|
||||
self.option("directory"),
|
||||
self.option("config"),
|
||||
)
|
||||
|
||||
assert isinstance(config, str) or config is None
|
||||
config = Config(config)
|
||||
|
||||
if directory is not None:
|
||||
|
@ -109,6 +109,7 @@ class DownloadCommand(Command):
|
|||
urls = self.argument("urls")
|
||||
|
||||
if path is not None:
|
||||
assert isinstance(path, str)
|
||||
if os.path.isfile(path):
|
||||
core.handle_txt(path)
|
||||
else:
|
||||
|
@ -129,7 +130,6 @@ class DownloadCommand(Command):
|
|||
|
||||
if outdated:
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
self.line(
|
||||
f"\n<info>A new version of streamrip <title>v{newest_version}</title>"
|
||||
|
@ -196,6 +196,8 @@ class SearchCommand(Command):
|
|||
def handle(self):
|
||||
query = self.argument("query")
|
||||
source, type = clean_options(self.option("source"), self.option("type"))
|
||||
assert isinstance(source, str)
|
||||
assert isinstance(type, str)
|
||||
|
||||
config = Config()
|
||||
core = RipCore(config)
|
||||
|
@ -411,7 +413,7 @@ class ConfigCommand(Command):
|
|||
{--update : Reset the config file, keeping the credentials}
|
||||
"""
|
||||
|
||||
_config: Optional[Config]
|
||||
_config: Config
|
||||
|
||||
def handle(self):
|
||||
import shutil
|
||||
|
@ -649,7 +651,6 @@ class ConvertCommand(Command):
|
|||
else:
|
||||
self.line(
|
||||
f'<error>Path <path>"{path}"</path> does not exist.</error>',
|
||||
fg="red",
|
||||
)
|
||||
|
||||
|
||||
|
@ -818,6 +819,7 @@ def is_outdated():
|
|||
|
||||
# Compare versions
|
||||
curr_version_parsed = map(int, __version__.split("."))
|
||||
assert isinstance(newest_version, str)
|
||||
newest_version_parsed = map(int, newest_version.split("."))
|
||||
outdated = False
|
||||
for c, n in zip(curr_version_parsed, newest_version_parsed):
|
||||
|
|
|
@ -132,7 +132,7 @@ class RipCore(list):
|
|||
config.file["database"][db_type]["path"] = default_db_path
|
||||
config.save()
|
||||
else:
|
||||
database = db_class(None, dummy=True)
|
||||
database = db_class("", dummy=True)
|
||||
|
||||
return database
|
||||
|
||||
|
@ -249,7 +249,7 @@ class RipCore(list):
|
|||
if max_items is None:
|
||||
max_items = float("inf")
|
||||
|
||||
self.db = db.Downloads(None, dummy=True)
|
||||
self.db = db.Downloads("", dummy=True)
|
||||
if self.failed_db.is_dummy:
|
||||
secho(
|
||||
"Failed downloads database must be enabled in the config file "
|
||||
|
|
13
rip/db.py
13
rip/db.py
|
@ -14,7 +14,7 @@ class Database:
|
|||
structure: dict
|
||||
name: str
|
||||
|
||||
def __init__(self, path, dummy=False):
|
||||
def __init__(self, path: str, dummy: bool = False):
|
||||
"""Create a Database instance.
|
||||
|
||||
:param path: Path to the database file.
|
||||
|
@ -23,13 +23,12 @@ class Database:
|
|||
assert self.structure != []
|
||||
assert self.name
|
||||
|
||||
if dummy or path is None:
|
||||
self.path = None
|
||||
self.is_dummy = True
|
||||
return
|
||||
self.is_dummy = False
|
||||
|
||||
self.path = path
|
||||
self.is_dummy = dummy
|
||||
|
||||
if self.is_dummy:
|
||||
return
|
||||
|
||||
if not os.path.exists(self.path):
|
||||
self.create()
|
||||
|
||||
|
|
|
@ -374,7 +374,7 @@ class QobuzClient(Client):
|
|||
self.label = resp["user"]["credential"]["parameters"]["short_label"]
|
||||
|
||||
def _api_get_file_url(
|
||||
self, track_id: Union[str, int], quality: int = 3, sec: str = None
|
||||
self, track_id: Union[str, int], quality: int = 3, sec: Optional[str] = None
|
||||
) -> dict:
|
||||
"""Get the file url given a track id.
|
||||
|
||||
|
|
|
@ -191,3 +191,5 @@ DEEZER_BASE = "https://api.deezer.com"
|
|||
DEEZER_DL = "http://dz.loaderapp.info/deezer"
|
||||
|
||||
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
|
||||
|
||||
MAX_FILES_OPEN = 128
|
||||
|
|
|
@ -5,7 +5,7 @@ import logging
|
|||
import os
|
||||
import re
|
||||
from tempfile import gettempdir
|
||||
from typing import Callable, Dict, Iterator, List, Optional
|
||||
from typing import Callable, Dict, Iterable, List, Optional
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
|
@ -81,10 +81,10 @@ class DownloadStream:
|
|||
except json.JSONDecodeError:
|
||||
raise NonStreamable("File not found.")
|
||||
|
||||
def __iter__(self) -> Iterator:
|
||||
def __iter__(self) -> Iterable:
|
||||
"""Iterate through chunks of the stream.
|
||||
|
||||
:rtype: Iterator
|
||||
:rtype: Iterable
|
||||
"""
|
||||
if self.source == "deezer" and self.is_encrypted.search(self.url) is not None:
|
||||
assert isinstance(self.id, str), self.id
|
||||
|
@ -151,7 +151,7 @@ class DownloadPool:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
urls: Iterator,
|
||||
urls: Iterable,
|
||||
tempdir: str = None,
|
||||
chunk_callback: Optional[Callable] = None,
|
||||
):
|
||||
|
|
|
@ -114,3 +114,7 @@ class PartialFailure(Exception):
|
|||
"""
|
||||
self.failed_items = failed_items
|
||||
super().__init__()
|
||||
|
||||
|
||||
class FfmpegError(Exception):
|
||||
"""Raise if ffmpeg returns nonzero exit code."""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue