mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2025-05-16 08:04:26 -04:00
add new pydantic-based plugin system
This commit is contained in:
parent
e4176dbf7a
commit
48becde9b4
33 changed files with 2280 additions and 4 deletions
561
archivebox/plugantic/binproviders.py
Normal file
561
archivebox/plugantic/binproviders.py
Normal file
|
@ -0,0 +1,561 @@
|
|||
__package__ = 'archivebox.plugantic'
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import operator
|
||||
|
||||
from typing import Callable, Any, Optional, Type, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING
|
||||
from typing_extensions import Self
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import namedtuple
|
||||
from pathlib import Path
|
||||
from subprocess import run, PIPE
|
||||
|
||||
from pydantic_core import core_schema, ValidationError
|
||||
from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler
|
||||
|
||||
|
||||
|
||||
def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool:
|
||||
"""returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless"""
|
||||
code = lambda_func.__code__
|
||||
has_args = code.co_argcount > 0
|
||||
has_varargs = code.co_flags & 0x04 != 0
|
||||
has_varkw = code.co_flags & 0x08 != 0
|
||||
return has_args or has_varargs or has_varkw
|
||||
|
||||
|
||||
def is_semver_str(semver: Any) -> bool:
|
||||
if isinstance(semver, str):
|
||||
return (semver.count('.') == 2 and semver.replace('.', '').isdigit())
|
||||
return False
|
||||
|
||||
def semver_to_str(semver: tuple[int, int, int] | str) -> str:
|
||||
if isinstance(semver, (list, tuple)):
|
||||
return '.'.join(str(chunk) for chunk in semver)
|
||||
if is_semver_str(semver):
|
||||
return semver
|
||||
raise ValidationError('Tried to convert invalid SemVer: {}'.format(semver))
|
||||
|
||||
|
||||
SemVerTuple = namedtuple('SemVerTuple', ('major', 'minor', 'patch'), defaults=(0, 0, 0))
|
||||
SemVerParsableTypes = str | tuple[str | int, ...] | list[str | int]
|
||||
|
||||
class SemVer(SemVerTuple):
|
||||
major: int
|
||||
minor: int = 0
|
||||
patch: int = 0
|
||||
|
||||
if TYPE_CHECKING:
|
||||
full_text: str | None = ''
|
||||
|
||||
def __new__(cls, *args, full_text=None, **kwargs):
|
||||
# '1.1.1'
|
||||
if len(args) == 1 and is_semver_str(args[0]):
|
||||
result = SemVer.parse(args[0])
|
||||
|
||||
# ('1', '2', '3')
|
||||
elif len(args) == 1 and isinstance(args[0], (tuple, list)):
|
||||
result = SemVer.parse(args[0])
|
||||
|
||||
# (1, '2', None)
|
||||
elif not all(isinstance(arg, (int, type(None))) for arg in args):
|
||||
result = SemVer.parse(args)
|
||||
|
||||
# (None)
|
||||
elif all(chunk in ('', 0, None) for chunk in (*args, *kwargs.values())):
|
||||
result = None
|
||||
|
||||
# 1, 2, 3
|
||||
else:
|
||||
result = SemVerTuple.__new__(cls, *args, **kwargs)
|
||||
|
||||
if result is not None:
|
||||
# add first line as extra hidden metadata so it can be logged without having to re-run version cmd
|
||||
result.full_text = full_text or str(result)
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def parse(cls, version_stdout: SemVerParsableTypes) -> Self | None:
|
||||
"""
|
||||
parses a version tag string formatted like into (major, minor, patch) ints
|
||||
'Google Chrome 124.0.6367.208' -> (124, 0, 6367)
|
||||
'GNU Wget 1.24.5 built on darwin23.2.0.' -> (1, 24, 5)
|
||||
'curl 8.4.0 (x86_64-apple-darwin23.0) ...' -> (8, 4, 0)
|
||||
'2024.04.09' -> (2024, 4, 9)
|
||||
|
||||
"""
|
||||
# print('INITIAL_VALUE', type(version_stdout).__name__, version_stdout)
|
||||
|
||||
if isinstance(version_stdout, (tuple, list)):
|
||||
version_stdout = '.'.join(str(chunk) for chunk in version_stdout)
|
||||
elif isinstance(version_stdout, bytes):
|
||||
version_stdout = version_stdout.decode()
|
||||
elif not isinstance(version_stdout, str):
|
||||
version_stdout = str(version_stdout)
|
||||
|
||||
# no text to work with, return None immediately
|
||||
if not version_stdout.strip():
|
||||
# raise Exception('Tried to parse semver from empty version output (is binary installed and available?)')
|
||||
return None
|
||||
|
||||
just_numbers = lambda col: col.lower().strip('v').split('+')[0].split('-')[0].split('_')[0]
|
||||
contains_semver = lambda col: (
|
||||
col.count('.') in (1, 2, 3)
|
||||
and all(chunk.isdigit() for chunk in col.split('.')[:3]) # first 3 chunks can only be nums
|
||||
)
|
||||
|
||||
full_text = version_stdout.split('\n')[0].strip()
|
||||
first_line_columns = full_text.split()[:4]
|
||||
version_columns = list(filter(contains_semver, map(just_numbers, first_line_columns)))
|
||||
|
||||
# could not find any column of first line that looks like a version number, despite there being some text
|
||||
if not version_columns:
|
||||
# raise Exception('Failed to parse semver from version command output: {}'.format(' '.join(first_line_columns)))
|
||||
return None
|
||||
|
||||
# take first col containing a semver, and truncate it to 3 chunks (e.g. 2024.04.09.91) -> (2024, 04, 09)
|
||||
first_version_tuple = version_columns[0].split('.', 3)[:3]
|
||||
|
||||
# print('FINAL_VALUE', first_version_tuple)
|
||||
|
||||
return cls(*(int(chunk) for chunk in first_version_tuple), full_text=full_text)
|
||||
|
||||
def __str__(self):
|
||||
return '.'.join(str(chunk) for chunk in self)
|
||||
|
||||
# @classmethod
|
||||
# def __get_pydantic_core_schema__(cls, source: Type[Any], handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
|
||||
# default_schema = handler(source)
|
||||
# return core_schema.no_info_after_validator_function(
|
||||
# cls.parse,
|
||||
# default_schema,
|
||||
# serialization=core_schema.plain_serializer_function_ser_schema(
|
||||
# lambda semver: str(semver),
|
||||
# info_arg=False,
|
||||
# return_schema=core_schema.str_schema(),
|
||||
# ),
|
||||
# )
|
||||
|
||||
assert SemVer(None) == None
|
||||
assert SemVer('') == None
|
||||
assert SemVer.parse('') == None
|
||||
assert SemVer(1) == (1, 0, 0)
|
||||
assert SemVer(1, 2) == (1, 2, 0)
|
||||
assert SemVer('1.2+234234') == (1, 2, 0)
|
||||
assert SemVer((1, 2, 3)) == (1, 2, 3)
|
||||
assert getattr(SemVer((1, 2, 3)), 'full_text') == '1.2.3'
|
||||
assert SemVer(('1', '2', '3')) == (1, 2, 3)
|
||||
assert SemVer.parse('5.6.7') == (5, 6, 7)
|
||||
assert SemVer.parse('124.0.6367.208') == (124, 0, 6367)
|
||||
assert SemVer.parse('Google Chrome 124.1+234.234') == (124, 1, 0)
|
||||
assert SemVer.parse('Google Ch1rome 124.0.6367.208') == (124, 0, 6367)
|
||||
assert SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324') == (124, 0, 6367)
|
||||
assert getattr(SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324'), 'full_text') == 'Google Chrome 124.0.6367.208+beta_234. 234.234.123'
|
||||
assert SemVer.parse('Google Chrome') == None
|
||||
|
||||
@validate_call
|
||||
def bin_name(bin_path_or_name: str | Path) -> str:
|
||||
name = Path(bin_path_or_name).name
|
||||
assert len(name) > 1
|
||||
assert name.replace('-', '').replace('_', '').replace('.', '').isalnum(), (
|
||||
f'Binary name can only contain a-Z0-9-_.: {name}')
|
||||
return name
|
||||
|
||||
BinName = Annotated[str, AfterValidator(bin_name)]
|
||||
|
||||
@validate_call
|
||||
def path_is_file(path: Path | str) -> Path:
|
||||
path = Path(path) if isinstance(path, str) else path
|
||||
assert path.is_file(), f'Path is not a file: {path}'
|
||||
return path
|
||||
|
||||
HostExistsPath = Annotated[Path, AfterValidator(path_is_file)]
|
||||
|
||||
@validate_call
|
||||
def path_is_executable(path: HostExistsPath) -> HostExistsPath:
|
||||
assert os.access(path, os.X_OK), f'Path is not executable (fix by running chmod +x {path})'
|
||||
return path
|
||||
|
||||
@validate_call
|
||||
def path_is_script(path: HostExistsPath) -> HostExistsPath:
|
||||
SCRIPT_EXTENSIONS = ('.py', '.js', '.sh')
|
||||
assert path.suffix.lower() in SCRIPT_EXTENSIONS, 'Path is not a script (does not end in {})'.format(', '.join(SCRIPT_EXTENSIONS))
|
||||
return path
|
||||
|
||||
HostExecutablePath = Annotated[HostExistsPath, AfterValidator(path_is_executable)]
|
||||
|
||||
@validate_call
|
||||
def path_is_abspath(path: Path) -> Path:
|
||||
return path.resolve()
|
||||
|
||||
HostAbsPath = Annotated[HostExistsPath, AfterValidator(path_is_abspath)]
|
||||
HostBinPath = Annotated[Path, AfterValidator(path_is_abspath), AfterValidator(path_is_file)]
|
||||
|
||||
|
||||
@validate_call
|
||||
def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None:
|
||||
assert bin_path_or_name
|
||||
|
||||
if str(bin_path_or_name).startswith('/'):
|
||||
# already a path, get its absolute form
|
||||
abspath = Path(bin_path_or_name).resolve()
|
||||
else:
|
||||
# not a path yet, get path using os.which
|
||||
binpath = shutil.which(bin_path_or_name)
|
||||
if not binpath:
|
||||
return None
|
||||
abspath = Path(binpath).resolve()
|
||||
|
||||
try:
|
||||
return TypeAdapter(HostBinPath).validate_python(abspath)
|
||||
except ValidationError:
|
||||
return None
|
||||
|
||||
|
||||
@validate_call
|
||||
def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None:
|
||||
return SemVer(run([bin_path, *args], stdout=PIPE).stdout.strip().decode())
|
||||
|
||||
|
||||
class InstalledBin(BaseModel):
|
||||
abspath: HostBinPath
|
||||
version: SemVer
|
||||
|
||||
|
||||
def is_valid_install_string(pkgs_str: str) -> str:
|
||||
"""Make sure a string is a valid install string for a package manager, e.g. 'yt-dlp ffmpeg'"""
|
||||
assert pkgs_str
|
||||
assert all(len(pkg) > 1 for pkg in pkgs_str.split(' '))
|
||||
return pkgs_str
|
||||
|
||||
def is_valid_python_dotted_import(import_str: str) -> str:
|
||||
assert import_str and import_str.replace('.', '').replace('_', '').isalnum()
|
||||
return import_str
|
||||
|
||||
InstallStr = Annotated[str, AfterValidator(is_valid_install_string)]
|
||||
|
||||
LazyImportStr = Annotated[str, AfterValidator(is_valid_python_dotted_import)]
|
||||
|
||||
ProviderHandler = Callable[..., Any] | Callable[[], Any] # must take no args [], or [bin_name: str, **kwargs]
|
||||
#ProviderHandlerStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
|
||||
ProviderHandlerRef = LazyImportStr | ProviderHandler
|
||||
ProviderLookupDict = Dict[str, LazyImportStr]
|
||||
ProviderType = Literal['abspath', 'version', 'subdeps', 'install']
|
||||
|
||||
|
||||
# class Host(BaseModel):
|
||||
# machine: str
|
||||
# system: str
|
||||
# platform: str
|
||||
# in_docker: bool
|
||||
# in_qemu: bool
|
||||
# python: str
|
||||
|
||||
BinProviderName = Literal['env', 'pip', 'apt', 'brew', 'npm', 'vendor']
|
||||
|
||||
|
||||
class BinProvider(ABC, BaseModel):
|
||||
name: BinProviderName
|
||||
|
||||
abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True)
|
||||
version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True)
|
||||
subdeps_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_subdeps'}, exclude=True)
|
||||
install_provider: ProviderLookupDict = Field(default={'*': 'self.on_install'}, exclude=True)
|
||||
|
||||
_abspath_cache: ClassVar = {}
|
||||
_version_cache: ClassVar = {}
|
||||
_install_cache: ClassVar = {}
|
||||
|
||||
# def provider_version(self) -> SemVer | None:
|
||||
# """Version of the actual underlying package manager (e.g. pip v20.4.1)"""
|
||||
# if self.name in ('env', 'vendor'):
|
||||
# return SemVer('0.0.0')
|
||||
# installer_binpath = Path(shutil.which(self.name)).resolve()
|
||||
# return bin_version(installer_binpath)
|
||||
|
||||
# def provider_host(self) -> Host:
|
||||
# """Information about the host env, archictecture, and OS needed to select & build packages"""
|
||||
# p = platform.uname()
|
||||
# return Host(
|
||||
# machine=p.machine,
|
||||
# system=p.system,
|
||||
# platform=platform.platform(),
|
||||
# python=sys.implementation.name,
|
||||
# in_docker=os.environ.get('IN_DOCKER', '').lower() == 'true',
|
||||
# in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true',
|
||||
# )
|
||||
|
||||
def get_default_providers(self):
|
||||
return self.get_providers_for_bin('*')
|
||||
|
||||
def resolve_provider_func(self, provider_func: ProviderHandlerRef | None) -> ProviderHandler | None:
|
||||
if provider_func is None:
|
||||
return None
|
||||
|
||||
# if provider_func is a dotted path to a function on self, swap it for the actual function
|
||||
if isinstance(provider_func, str) and provider_func.startswith('self.'):
|
||||
provider_func = getattr(self, provider_func.split('self.', 1)[-1])
|
||||
|
||||
# if provider_func is a dot-formatted import string, import the function
|
||||
if isinstance(provider_func, str):
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
package_name, module_name, classname, path = provider_func.split('.', 3) # -> abc, def, ghi.jkl
|
||||
|
||||
# get .ghi.jkl nested attr present on module abc.def
|
||||
imported_module = import_string(f'{package_name}.{module_name}.{classname}')
|
||||
provider_func = operator.attrgetter(path)(imported_module)
|
||||
|
||||
# # abc.def.ghi.jkl -> 1, 2, 3
|
||||
# for idx in range(1, len(path)):
|
||||
# parent_path = '.'.join(path[:-idx]) # abc.def.ghi
|
||||
# try:
|
||||
# parent_module = import_string(parent_path)
|
||||
# provider_func = getattr(parent_module, path[-idx])
|
||||
# except AttributeError, ImportError:
|
||||
# continue
|
||||
|
||||
assert TypeAdapter(ProviderHandler).validate_python(provider_func), (
|
||||
f'{self.__class__.__name__} provider func for {bin_name} was not a function or dotted-import path: {provider_func}')
|
||||
|
||||
return provider_func
|
||||
|
||||
@validate_call
|
||||
def get_providers_for_bin(self, bin_name: str) -> ProviderLookupDict:
|
||||
providers_for_bin = {
|
||||
'abspath': self.abspath_provider.get(bin_name),
|
||||
'version': self.version_provider.get(bin_name),
|
||||
'subdeps': self.subdeps_provider.get(bin_name),
|
||||
'install': self.install_provider.get(bin_name),
|
||||
}
|
||||
only_set_providers_for_bin = {k: v for k, v in providers_for_bin.items() if v is not None}
|
||||
|
||||
return only_set_providers_for_bin
|
||||
|
||||
@validate_call
|
||||
def get_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None) -> ProviderHandler:
|
||||
"""
|
||||
Get the provider func for a given key + Dict of provider callbacks + fallback default provider.
|
||||
e.g. get_provider_for_action(bin_name='yt-dlp', 'install', default_provider=self.on_install, ...) -> Callable
|
||||
"""
|
||||
|
||||
provider_func_ref = (
|
||||
(overrides or {}).get(provider_type)
|
||||
or self.get_providers_for_bin(bin_name).get(provider_type)
|
||||
or self.get_default_providers().get(provider_type)
|
||||
or default_provider
|
||||
)
|
||||
# print('getting provider for action', bin_name, provider_type, provider_func)
|
||||
|
||||
provider_func = self.resolve_provider_func(provider_func_ref)
|
||||
|
||||
assert provider_func, f'No {self.name} provider func was found for {bin_name} in: {self.__class__.__name__}.'
|
||||
|
||||
return provider_func
|
||||
|
||||
@validate_call
|
||||
def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None, **kwargs) -> Any:
|
||||
provider_func: ProviderHandler = self.get_provider_for_action(
|
||||
bin_name=bin_name,
|
||||
provider_type=provider_type,
|
||||
default_provider=default_provider,
|
||||
overrides=overrides,
|
||||
)
|
||||
if not func_takes_args_or_kwargs(provider_func):
|
||||
# if it's a pure argless lambdas, dont pass bin_path and other **kwargs
|
||||
provider_func_without_args = cast(Callable[[], Any], provider_func)
|
||||
return provider_func_without_args()
|
||||
|
||||
provider_func = cast(Callable[..., Any], provider_func)
|
||||
return provider_func(bin_name, **kwargs)
|
||||
|
||||
|
||||
|
||||
def on_get_abspath(self, bin_name: BinName, **_) -> HostBinPath | None:
|
||||
print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...')
|
||||
try:
|
||||
return bin_abspath(bin_name)
|
||||
except ValidationError:
|
||||
return None
|
||||
|
||||
def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **_) -> SemVer | None:
|
||||
abspath = abspath or self._abspath_cache.get(bin_name) or self.get_abspath(bin_name)
|
||||
if not abspath: return None
|
||||
|
||||
print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...')
|
||||
try:
|
||||
return bin_version(abspath)
|
||||
except ValidationError:
|
||||
return None
|
||||
|
||||
def on_get_subdeps(self, bin_name: BinName, **_) -> InstallStr:
|
||||
print(f'[*] {self.__class__.__name__}: Getting subdependencies for {bin_name}')
|
||||
# ... subdependency calculation logic here
|
||||
return TypeAdapter(InstallStr).validate_python(bin_name)
|
||||
|
||||
@abstractmethod
|
||||
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
||||
subdeps = subdeps or self.get_subdeps(bin_name)
|
||||
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
||||
# ... install logic here
|
||||
assert True
|
||||
|
||||
|
||||
@validate_call
|
||||
def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None:
|
||||
abspath = self.call_provider_for_action(
|
||||
bin_name=bin_name,
|
||||
provider_type='abspath',
|
||||
default_provider=self.on_get_abspath,
|
||||
overrides=overrides,
|
||||
)
|
||||
if not abspath:
|
||||
return None
|
||||
result = TypeAdapter(HostBinPath).validate_python(abspath)
|
||||
self._abspath_cache[bin_name] = result
|
||||
return result
|
||||
|
||||
@validate_call
|
||||
def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, overrides: Optional[ProviderLookupDict]=None) -> SemVer | None:
|
||||
version = self.call_provider_for_action(
|
||||
bin_name=bin_name,
|
||||
provider_type='version',
|
||||
default_provider=self.on_get_version,
|
||||
overrides=overrides,
|
||||
abspath=abspath,
|
||||
)
|
||||
if not version:
|
||||
return None
|
||||
result = SemVer(version)
|
||||
self._version_cache[bin_name] = result
|
||||
return result
|
||||
|
||||
@validate_call
|
||||
def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstallStr:
|
||||
subdeps = self.call_provider_for_action(
|
||||
bin_name=bin_name,
|
||||
provider_type='subdeps',
|
||||
default_provider=self.on_get_subdeps,
|
||||
overrides=overrides,
|
||||
)
|
||||
if not subdeps:
|
||||
subdeps = bin_name
|
||||
result = TypeAdapter(InstallStr).validate_python(subdeps)
|
||||
return result
|
||||
|
||||
@validate_call
|
||||
def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstalledBin | None:
|
||||
subdeps = self.get_subdeps(bin_name, overrides=overrides)
|
||||
|
||||
self.call_provider_for_action(
|
||||
bin_name=bin_name,
|
||||
provider_type='install',
|
||||
default_provider=self.on_install,
|
||||
overrides=overrides,
|
||||
subdeps=subdeps,
|
||||
)
|
||||
|
||||
installed_abspath = self.get_abspath(bin_name)
|
||||
assert installed_abspath, f'Unable to find {bin_name} abspath after installing with {self.name}'
|
||||
|
||||
installed_version = self.get_version(bin_name, abspath=installed_abspath)
|
||||
assert installed_version, f'Unable to find {bin_name} version after installing with {self.name}'
|
||||
|
||||
result = InstalledBin(abspath=installed_abspath, version=installed_version)
|
||||
self._install_cache[bin_name] = result
|
||||
return result
|
||||
|
||||
@validate_call
|
||||
def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=False) -> InstalledBin | None:
|
||||
installed_abspath = None
|
||||
installed_version = None
|
||||
|
||||
if cache:
|
||||
installed_bin = self._install_cache.get(bin_name)
|
||||
if installed_bin:
|
||||
return installed_bin
|
||||
installed_abspath = self._abspath_cache.get(bin_name)
|
||||
installed_version = self._version_cache.get(bin_name)
|
||||
|
||||
|
||||
installed_abspath = installed_abspath or self.get_abspath(bin_name, overrides=overrides)
|
||||
if not installed_abspath:
|
||||
return None
|
||||
|
||||
installed_version = installed_version or self.get_version(bin_name, abspath=installed_abspath, overrides=overrides)
|
||||
if not installed_version:
|
||||
return None
|
||||
|
||||
return InstalledBin(abspath=installed_abspath, version=installed_version)
|
||||
|
||||
@validate_call
|
||||
def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> InstalledBin | None:
|
||||
installed = self.load(bin_name, overrides=overrides, cache=cache)
|
||||
if not installed:
|
||||
installed = self.install(bin_name, overrides=overrides)
|
||||
return installed
|
||||
|
||||
|
||||
class PipProvider(BinProvider):
|
||||
name: BinProviderName = 'pip'
|
||||
|
||||
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
|
||||
subdeps = subdeps or self.on_get_subdeps(bin_name)
|
||||
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
||||
|
||||
proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
||||
|
||||
if proc.returncode != 0:
|
||||
print(proc.stdout.strip().decode())
|
||||
print(proc.stderr.strip().decode())
|
||||
raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
||||
|
||||
|
||||
class AptProvider(BinProvider):
|
||||
name: BinProviderName = 'apt'
|
||||
|
||||
subdeps_provider: ProviderLookupDict = {
|
||||
'yt-dlp': lambda: 'yt-dlp ffmpeg',
|
||||
}
|
||||
|
||||
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
||||
subdeps = subdeps or self.on_get_subdeps(bin_name)
|
||||
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
||||
|
||||
run(['apt-get', 'update', '-qq'])
|
||||
proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
||||
|
||||
if proc.returncode != 0:
|
||||
print(proc.stdout.strip().decode())
|
||||
print(proc.stderr.strip().decode())
|
||||
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
||||
|
||||
class BrewProvider(BinProvider):
|
||||
name: BinProviderName = 'brew'
|
||||
|
||||
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
|
||||
subdeps = subdeps or self.on_get_subdeps(bin_name)
|
||||
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
||||
|
||||
proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
||||
|
||||
if proc.returncode != 0:
|
||||
print(proc.stdout.strip().decode())
|
||||
print(proc.stderr.strip().decode())
|
||||
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
||||
|
||||
|
||||
class EnvProvider(BinProvider):
|
||||
name: BinProviderName = 'env'
|
||||
|
||||
abspath_provider: ProviderLookupDict = {
|
||||
# 'python': lambda: Path('/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/bin/python3.10'),
|
||||
}
|
||||
version_provider: ProviderLookupDict = {
|
||||
# 'python': lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
|
||||
}
|
||||
|
||||
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
||||
"""The env provider is ready-only and does not install any packages, so this is a no-op"""
|
||||
pass
|
Loading…
Add table
Add a link
Reference in a new issue