__package__ = 'abx' __id__ = 'abx' __label__ = 'ABX' __author__ = 'Nick Sweeting' __homepage__ = 'https://github.com/ArchiveBox' __order__ = 0 import inspect import importlib import itertools from pathlib import Path from typing import Dict, Callable, List, Set, Tuple, Iterable, Any, TypeVar, TypedDict, Type, cast, Generic, Mapping, overload, Final, ParamSpec, Literal, Protocol from types import ModuleType from typing_extensions import Annotated from functools import cache from benedict import benedict from pydantic import AfterValidator from pluggy import HookimplMarker, PluginManager, HookimplOpts, HookspecOpts, HookCaller ParamsT = ParamSpec("ParamsT") ReturnT = TypeVar('ReturnT') class HookSpecDecoratorThatReturnsFirstResult(Protocol): def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, ReturnT]: ... class HookSpecDecoratorThatReturnsListResults(Protocol): def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, List[ReturnT]]: ... class TypedHookspecMarker: """ Improved version of pluggy.HookspecMarker that supports type inference of hookspecs with firstresult=True|False correctly https://github.com/pytest-dev/pluggy/issues/191 """ __slots__ = ('project_name',) def __init__(self, project_name: str) -> None: self.project_name: Final[str] = project_name # handle @hookspec(firstresult=False) -> List[ReturnT] (test_firstresult_False_hookspec) @overload def __call__( self, function: None = ..., firstresult: Literal[False] = ..., historic: bool = ..., warn_on_impl: Warning | None = ..., warn_on_impl_args: Mapping[str, Warning] | None = ..., ) -> HookSpecDecoratorThatReturnsListResults: ... # handle @hookspec(firstresult=True) -> ReturnT (test_firstresult_True_hookspec) @overload def __call__( self, function: None = ..., firstresult: Literal[True] = ..., historic: bool = ..., warn_on_impl: Warning | None = ..., warn_on_impl_args: Mapping[str, Warning] | None = ..., ) -> HookSpecDecoratorThatReturnsFirstResult: ... # handle @hookspec -> List[ReturnT] (test_normal_hookspec) # order matters!!! this one has to come last @overload def __call__( self, function: Callable[ParamsT, ReturnT] = ..., firstresult: Literal[False] = ..., historic: bool = ..., warn_on_impl: None = ..., warn_on_impl_args: None = ..., ) -> Callable[ParamsT, List[ReturnT]]: ... def __call__( self, function: Callable[ParamsT, ReturnT] | None = None, firstresult: bool = False, historic: bool = False, warn_on_impl: Warning | None = None, warn_on_impl_args: Mapping[str, Warning] | None = None, ) -> Callable[ParamsT, List[ReturnT]] | HookSpecDecoratorThatReturnsListResults | HookSpecDecoratorThatReturnsFirstResult: def setattr_hookspec_opts(func) -> Callable: if historic and firstresult: raise ValueError("cannot have a historic firstresult hook") opts: HookspecOpts = { "firstresult": firstresult, "historic": historic, "warn_on_impl": warn_on_impl, "warn_on_impl_args": warn_on_impl_args, } setattr(func, self.project_name + "_spec", opts) return func if function is not None: return setattr_hookspec_opts(function) else: return setattr_hookspec_opts spec = hookspec = TypedHookspecMarker("abx") impl = hookimpl = HookimplMarker("abx") def is_valid_attr_name(x: str) -> str: assert x.isidentifier() and not x.startswith('_') return x def is_valid_module_name(x: str) -> str: assert x.isidentifier() and not x.startswith('_') and x.islower() return x AttrName = Annotated[str, AfterValidator(is_valid_attr_name)] PluginId = Annotated[str, AfterValidator(is_valid_module_name)] class PluginInfo(TypedDict, total=True): id: PluginId package: AttrName label: str version: str author: str homepage: str dependencies: List[str] source_code: str hooks: Dict[AttrName, Callable] module: ModuleType PluginSpec = TypeVar("PluginSpec") class ABXPluginManager(PluginManager, Generic[PluginSpec]): """ Patch to fix pluggy's PluginManager to work with pydantic models. See: https://github.com/pytest-dev/pluggy/pull/536 """ # enable static type checking of pm.hook.call() calls # https://stackoverflow.com/a/62871889/2156113 # https://github.com/pytest-dev/pluggy/issues/191 hook: PluginSpec def create_typed_hookcaller(self, name: str, module_or_class: Type[PluginSpec], spec_opts: HookspecOpts) -> HookCaller: """ create a new HookCaller subclass with a modified __signature__ so that the return type is correct and args are converted to kwargs """ TypedHookCaller = type('TypedHookCaller', (HookCaller,), {}) hookspec_signature = inspect.signature(getattr(module_or_class, name)) hookspec_return_type = hookspec_signature.return_annotation # replace return type with list if firstresult=False hookcall_return_type = hookspec_return_type if spec_opts['firstresult'] else List[hookspec_return_type] # replace each arg with kwarg equivalent (pm.hook.call() only accepts kwargs) args_as_kwargs = [ param.replace(kind=inspect.Parameter.KEYWORD_ONLY) if param.name != 'self' else param for param in hookspec_signature.parameters.values() ] TypedHookCaller.__signature__ = hookspec_signature.replace(parameters=args_as_kwargs, return_annotation=hookcall_return_type) TypedHookCaller.__name__ = f'{name}_HookCaller' return TypedHookCaller(name, self._hookexec, module_or_class, spec_opts) def add_hookspecs(self, module_or_class: Type[PluginSpec]) -> None: """Add HookSpecs from the given class, (generic type allows us to enforce types of pm.hook.call() statically)""" names = [] for name in dir(module_or_class): spec_opts = self.parse_hookspec_opts(module_or_class, name) if spec_opts is not None: hc: HookCaller | None = getattr(self.hook, name, None) if hc is None: hc = self.create_typed_hookcaller(name, module_or_class, spec_opts) setattr(self.hook, name, hc) else: # Plugins registered this hook without knowing the spec. hc.set_specification(module_or_class, spec_opts) for hookfunction in hc.get_hookimpls(): self._verify_hook(hc, hookfunction) names.append(name) if not names: raise ValueError( f"did not find any {self.project_name!r} hooks in {module_or_class!r}" ) def parse_hookimpl_opts(self, plugin, name: str) -> HookimplOpts | None: # IMPORTANT: @property methods can have side effects, and are never hookimpl # if attr is a property, skip it in advance # plugin_class = plugin if inspect.isclass(plugin) else type(plugin) if isinstance(getattr(plugin, name, None), property): return None try: return super().parse_hookimpl_opts(plugin, name) except AttributeError: return None pm = ABXPluginManager("abx") def get_plugin_order(plugin: PluginId | Path | ModuleType | Type) -> Tuple[int, Path]: assert plugin plugin_module = None plugin_dir = None if isinstance(plugin, str) or isinstance(plugin, Path): if str(plugin).endswith('.py'): plugin_dir = Path(plugin).parent elif '/' in str(plugin): # assume it's a path to a plugin directory plugin_dir = Path(plugin) elif str(plugin).isidentifier(): pass elif inspect.ismodule(plugin): plugin_module = plugin plugin_dir = Path(str(plugin_module.__file__)).parent elif inspect.isclass(plugin): plugin_module = plugin plugin_dir = Path(inspect.getfile(plugin)).parent else: raise ValueError(f'Invalid plugin, cannot get order: {plugin}') if plugin_dir: try: # if .plugin_order file exists, use it to set the load priority order = int((plugin_dir / '.plugin_order').read_text()) assert -1000000 < order < 100000000 return (order, plugin_dir) except FileNotFoundError: pass if plugin_module: order = getattr(plugin_module, '__order__', 999) else: order = 999 assert order is not None assert plugin_dir return (order, plugin_dir) # @cache def get_plugin(plugin: PluginId | ModuleType | Type) -> PluginInfo: assert plugin # import the plugin module by its name if isinstance(plugin, str): module = importlib.import_module(plugin) print('IMPORTED PLUGIN:', plugin) plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module)) elif inspect.ismodule(plugin): module = plugin plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module)) elif inspect.isclass(plugin): module = inspect.getmodule(plugin) else: raise ValueError(f'Invalid plugin, must be a module, class, or plugin ID (package name): {plugin}') assert module plugin_file = Path(inspect.getfile(module)) plugin_package = module.__package__ or module.__name__ plugin_id = plugin_package.replace('.', '_') # load the plugin info from the plugin/__init__.py __attr__s if they exist plugin_module_attrs = { 'label': getattr(module, '__label__', plugin_id), 'version': getattr(module, '__version__', '0.0.1'), 'author': getattr(module, '__author__', 'ArchiveBox'), 'homepage': getattr(module, '__homepage__', 'https://github.com/ArchiveBox'), 'dependencies': getattr(module, '__dependencies__', []), } # load the plugin info from the plugin/pyproject.toml file if it has one plugin_toml_info = {} try: # try loading ./pyproject.toml first in case the plugin is a bare python file not inside a package dir plugin_toml_info = benedict.from_toml((plugin_file.parent / 'pyproject.toml').read_text()).project except Exception: try: # try loading ../pyproject.toml next in case the plugin is in a packge dir plugin_toml_info = benedict.from_toml((plugin_file.parent.parent / 'pyproject.toml').read_text()).project except Exception: # print('WARNING: could not detect pyproject.toml for PLUGIN:', plugin_id, plugin_file.parent, 'ERROR:', e) pass assert plugin_id assert plugin_package assert module.__file__ # merge the plugin info from all sources + add dyanmically calculated info return cast(PluginInfo, benedict(PluginInfo(**{ 'id': plugin_id, **plugin_module_attrs, **plugin_toml_info, 'package': plugin_package, 'source_code': module.__file__, 'order': get_plugin_order(plugin), 'hooks': get_plugin_hooks(plugin), 'module': module, 'plugin': plugin, }))) def get_all_plugins() -> Dict[PluginId, PluginInfo]: """Get the metadata for all the plugins registered with Pluggy.""" plugins = {} for plugin_module in pm.get_plugins(): plugin_info = get_plugin(plugin=plugin_module) assert 'id' in plugin_info plugins[plugin_info['id']] = plugin_info return benedict(plugins) def get_all_hook_names() -> Set[str]: """Get a set of all hook names across all plugins""" return { hook_name for plugin_module in pm.get_plugins() for hook_name in get_plugin_hooks(plugin_module) } def get_all_hook_specs() -> Dict[str, Dict[str, Any]]: """Get a set of all hookspec methods defined in all plugins (useful for type checking if a pm.hook.call() is valid)""" hook_specs = {} for hook_name in get_all_hook_names(): for plugin_module in pm.get_plugins(): if hasattr(plugin_module, hook_name): hookspecopts = pm.parse_hookspec_opts(plugin_module, hook_name) if hookspecopts: method = getattr(plugin_module, hook_name) signature = inspect.signature(method) return_type = signature.return_annotation if signature.return_annotation != inspect._empty else None if hookspecopts.get('firstresult'): return_type = return_type else: # if not firstresult, return_type is a sequence return_type = List[return_type] call_signature = signature.replace(return_annotation=return_type) method = lambda *args, **kwargs: getattr(pm.hook, hook_name)(*args, **kwargs) method.__signature__ = call_signature method.__name__ = hook_name method.__package__ = plugin_module.__package__ hook_specs[hook_name] = { 'name': hook_name, 'method': method, 'signature': call_signature, 'hookspec_opts': hookspecopts, 'hookspec_signature': signature, 'hookspec_plugin': plugin_module.__package__, } return hook_specs ###### PLUGIN DISCOVERY AND LOADING ######################################################## def find_plugins_in_dir(plugins_dir: Path) -> Dict[PluginId, Path]: """ Find all the plugins in a given directory. Just looks for an __init__.py file. """ python_dirs = plugins_dir.glob("*/__init__.py") sorted_python_dirs = sorted(python_dirs, key=lambda p: get_plugin_order(plugin=p) or 500) return { plugin_entrypoint.parent.name: plugin_entrypoint.parent for plugin_entrypoint in sorted_python_dirs if plugin_entrypoint.parent.name not in ('abx', 'core') } def get_pip_installed_plugins(group: PluginId='abx') -> Dict[PluginId, Path]: """replaces pm.load_setuptools_entrypoints("abx"), finds plugins that registered entrypoints via pip""" import importlib.metadata DETECTED_PLUGINS = {} # module_name: module_dir_path for dist in list(importlib.metadata.distributions()): for entrypoint in dist.entry_points: if entrypoint.group != group or pm.is_blocked(entrypoint.name): continue DETECTED_PLUGINS[entrypoint.name] = Path(entrypoint.load().__file__).parent # pm.register(plugin, name=ep.name) # pm._plugin_distinfo.append((plugin, DistFacade(dist))) return DETECTED_PLUGINS # Load all plugins from pip packages, archivebox built-ins, and user plugins def load_plugins(plugins: Iterable[PluginId | ModuleType | Type] | Dict[PluginId, Path]): """ Load all the plugins from a dictionary of module names and directory paths. """ PLUGINS_TO_LOAD = [] LOADED_PLUGINS = {} for plugin in plugins: plugin_info = get_plugin(plugin) assert plugin_info, f'No plugin metadata found for {plugin}' assert 'id' in plugin_info and 'module' in plugin_info if plugin_info['module'] in pm.get_plugins(): LOADED_PLUGINS[plugin_info['id']] = plugin_info continue else: PLUGINS_TO_LOAD.append(plugin_info) PLUGINS_TO_LOAD = sorted(PLUGINS_TO_LOAD, key=lambda x: x['order']) for plugin_info in PLUGINS_TO_LOAD: pm.register(plugin_info['module']) LOADED_PLUGINS[plugin_info['id']] = plugin_info # print(f' √ Loaded plugin: {plugin_id}') return benedict(LOADED_PLUGINS) @cache def get_plugin_hooks(plugin: PluginId | ModuleType | Type | None) -> Dict[AttrName, Callable]: """Get all the functions marked with @hookimpl on a module.""" if not plugin: return {} hooks = {} if isinstance(plugin, str): plugin_module = importlib.import_module(plugin) elif inspect.ismodule(plugin) or inspect.isclass(plugin): plugin_module = plugin else: raise ValueError(f'Invalid plugin, cannot get hooks: {plugin}') for attr_name in dir(plugin_module): if attr_name.startswith('_'): continue try: attr = getattr(plugin_module, attr_name) if isinstance(attr, Callable): if pm.parse_hookimpl_opts(plugin_module, attr_name): hooks[attr_name] = attr except Exception as e: print(f'Error getting hookimpls for {plugin}: {e}') return hooks ReturnT = TypeVar('ReturnT') def as_list(results: List[List[ReturnT]]) -> List[ReturnT]: """Flatten a list of lists returned by a pm.hook.call() into a single list""" return list(itertools.chain(*results)) def as_dict(results: List[Dict[PluginId, ReturnT]]) -> Dict[PluginId, ReturnT]: """Flatten a list of dicts returned by a pm.hook.call() into a single dict""" if isinstance(results, (dict, benedict)): results_list = results.values() else: results_list = results return benedict({ result_id: result for plugin_results in results_list for result_id, result in plugin_results.items() })