diff --git a/archivebox/plugantic/settings.py b/archivebox/plugantic/settings.py new file mode 100644 index 00000000..d2e96bdf --- /dev/null +++ b/archivebox/plugantic/settings.py @@ -0,0 +1,336 @@ +import re +import os +import sys +import toml +import json +import platform +import inspect +import tomllib + +from typing import Callable, Any, Optional, Pattern, Type, Tuple, Dict, List +from pathlib import Path + +from pydantic import BaseModel, Field, FieldValidationInfo, AliasChoices, model_validator, FilePath, DirectoryPath, computed_field, TypeAdapter +from pydantic.fields import FieldInfo + +from pydantic_settings import BaseSettings, SettingsConfigDict, PydanticBaseSettingsSource +from pydantic_settings.sources import InitSettingsSource, ConfigFileSourceMixin, TomlConfigSettingsSource + +from pydantic.json_schema import GenerateJsonSchema +from pydantic_core import PydanticOmit, core_schema, to_jsonable_python, ValidationError +from pydantic.json_schema import GenerateJsonSchema, JsonSchemaValue + +import ini_to_toml + + +class JSONSchemaWithLambdas(GenerateJsonSchema): + def encode_default(self, default: Any) -> Any: + """Encode lambda functions in default values properly""" + config = self._config + if isinstance(default, Callable): + return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}' + return to_jsonable_python( + default, + timedelta_mode=config.ser_json_timedelta, + bytes_mode=config.ser_json_bytes, + serialize_unknown=True + ) + + # for computed_field properties render them like this instead: + # inspect.getsource(field.wrapped_property.fget).split('def ', 1)[-1].split('\n', 1)[-1].strip().strip('return '), + + +class ModelWithDefaults(BaseSettings): + model_config = SettingsConfigDict(validate_default=False, case_sensitive=False, extra='ignore') + + @model_validator(mode='after') + def fill_defaults(self): + """Populate any unset values using function provided as their default""" + for key, field in self.model_fields.items(): + value = getattr(self, key) + if isinstance(value, Callable): + # if value is a function, execute it to get the actual value, passing CONFIG dict as an arg + config_so_far = self.dict(exclude_unset=True) + fallback_value = field.default(config_so_far) + + # check to make sure default factory return value matches type annotation + TypeAdapter(field.annotation).validate_python(fallback_value) + + # set generated default value as final validated value + setattr(self, key, fallback_value) + return self + + def as_json(self, model_fields=True, computed_fields=True): + output_dict = {} + for section in self.__class__.__mro__[1:]: + if not section.__name__.isupper(): + break + output_dict[section.__name__] = output_dict.get(section.__name__) or {} + include = {} + if model_fields: + include.update(**section.model_fields) + if computed_fields: + include.update(**section.model_computed_fields) + output_dict[section.__name__].update(json.loads(section.json(self, include=include))) + return output_dict + + def as_toml(self, model_fields=True, computed_fields=True): + output_text = '' + for section in self.__class__.__mro__[1:]: + if not section.__name__.isupper(): + break + include = {} + if model_fields: + include.update(**section.model_fields) + if computed_fields: + include.update(**section.model_computed_fields) + + output_text += ( + f'[{section.__name__}]\n' + + toml.dumps(json.loads(section.json(self, include=include))) + '\n' + ) + return output_text + + def as_legacy_schema(self, model_fields=True, computed_fields=True): + """Convert a newer Pydantic Settings BaseModel into the old-style archivebox.config CONFIG_SCHEMA format""" + + schemas = {} + + include = {} + if model_fields: + include.update(**self.model_fields) + if computed_fields: + include.update(**self.model_computed_fields) + + for key, field in include.items(): + key = key.upper() + defining_class = None + for cls in self.__class__.__mro__[1:]: + if key in cls.model_fields or key in cls.model_computed_fields: + defining_class = cls + break + + assert defining_class is not None, f"No defining class found for field {key}! (should be impossible)" + + schemas[defining_class.__name__] = schemas.get(defining_class.__name__) or {} + schemas[defining_class.__name__][key] = { + 'value': getattr(self, key), + 'type': str(field.annotation.__name__).lower() if hasattr(field, 'annotation') else str(field.return_type).lower(), + 'default': field.default if hasattr(field, 'default') else field.wrapped_property.fget, + 'aliases': (getattr(field.json_schema_extra.get('aliases', {}), 'choices') or []) if getattr(field, 'json_schema_extra') else [], + } + + return schemas + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + ARCHIVEBOX_CONFIG_FILE = Path('/Users/squash/Local/Code/archiveboxes/ArchiveBox/data/ArchiveBox.conf') + ARCHIVEBOX_CONFIG_FILE_TOML = ARCHIVEBOX_CONFIG_FILE.parent / f'.ArchiveBox.toml' + try: + return ( + init_settings, + env_settings, + TomlConfigSettingsSource(settings_cls, toml_file=ARCHIVEBOX_CONFIG_FILE), + ) + except tomllib.TOMLDecodeError: + toml_str = ini_to_toml.convert(ARCHIVEBOX_CONFIG_FILE.read_text()) + with open(ARCHIVEBOX_CONFIG_FILE_TOML, 'w+') as f: + f.write(toml_str) + + return ( + init_settings, + env_settings, + TomlConfigSettingsSource(settings_cls, toml_file=ARCHIVEBOX_CONFIG_FILE_TOML), + ) + + +class SHELL_CONFIG(ModelWithDefaults): + IS_TTY: bool = Field(default=lambda c: sys.stdout.isatty()) + USE_COLOR: bool = Field(default=lambda c: c['IS_TTY']) + SHOW_PROGRESS: bool = Field(default=lambda c: c['IS_TTY'] and (platform.system() != 'Darwin')) + + IN_DOCKER: bool = Field(default=False) + IN_QEMU: bool = Field(default=False) + PUID: int = Field(default=lambda c: os.getuid()) + PGID: int = Field(default=lambda c: os.getgid()) + + +class GENERAL_CONFIG(ModelWithDefaults): + # OUTPUT_DIR: DirectoryPath + CONFIG_FILE: FilePath = Field(default=lambda c: c['OUTPUT_DIR'] / 'ArchiveBox.conf') + + ONLY_NEW: bool = Field(default=True) + TIMEOUT: int = Field(default=60) + MEDIA_TIMEOUT: int = Field(default=3600) + + ENFORCE_ATOMIC_WRITES: bool = Field(default=True) + OUTPUT_PERMISSIONS: str = Field(default='644') + RESTRICT_FILE_NAMES: str = Field(default='windows') + + URL_DENYLIST: Pattern = Field(default=re.compile(r'\.(css|js|otf|ttf|woff|woff2|gstatic\.com|googleapis\.com/css)(\?.*)?$'), aliases=AliasChoices('URL_BLACKLIST')) + URL_ALLOWLIST: Pattern = Field(default=re.compile(r''), aliases=AliasChoices('URL_WHITELIST')) + + ADMIN_USERNAME: Optional[str] = Field(default=None, min_length=1, max_length=63, pattern=r'^[\S]+$') + ADMIN_PASSWORD: Optional[str] = Field(default=None, min_length=1, max_length=63) + + TAG_SEPARATOR_PATTERN: Pattern = Field(default=re.compile(r'[,]')) + + @computed_field + @property + def OUTPUT_DIR(self) -> DirectoryPath: + return Path('.').resolve() + +# class PackageInstalled(ModelWithDefaults): +# binary_abs: HostBinPathStr +# version_str: str +# is_valid: True +# provider: PackageProvider +# date_installed: datetime +# date_checked: datetime + +class EntrypointConfig(ModelWithDefaults): + name: str + dependency: str + runtime: Literal['python.eval', 'node.eval', 'puppeteer', 'shell.run', 'ansible'] + CMD: str + DEFAULT_ARGS: List[str] + EXTRA_ARGS: List[str] + ARGS: List[str] + SCHEMA: EntrypointSchema + validator: Callable = eval + +class VersionEntrypointConfig(ModelWithDefaults): + DEFAULT_ARGS = ['--version'] + +class PackageProvider(ModelWithDefaults): + name: Literal['config', 'PATH', 'pip', 'apt', 'brew', 'npm', 'vendor'] + + def install_bin(self, name): + # ... + return PackageInstall + + def get_bin_path(self, name, install=True): + return shell(['which', name]) + +class DependencyConfig(ModelWithDefaults): + providers: List[Literal['config', 'PATH', 'pip', 'apt', 'brew', 'npm', 'vendor'], ...] + name: str + packages: List[str] + entrypoints: Dict[str, EntrypointConfig] + version_cmd: EntrypointConfig = field(default=lambda c: ) + +class ExtractorConfig(ModelWithDefaults): + name: str + description: str = Field(examples=['WGET Extractor']) + depends_on: DepencencyConfig + entrypoint: EntrypointConfig = Field(description='Which entrypoint to use for this extractor') + +class ReplayerConfig(ModelWithDefaults): + """Describes how to render an ArchiveResult in several contexts""" + name: str + row_template: 'plugins/wget/templates/row.html' + embed_template: 'plugins/wget/templates/embed.html' + fullpage_template: 'plugins/wget/templates/fullpage.html' + + icon_view: ImportString 'plugins.wget.replayers.wget.get_icon' + thumbnail_getter: ImportString = 'plugins.wget.replayers.wget.get_icon' + +class PluginConfig(ModelWithDefaults): + dependencies: Dict[str, DependencyConfig] + entrypoints: Dict[str, EntrypointConfig] + extractors: Dict[str, ExtractorConfig] + replayers: Dict[str, ReplayerConfig] + + name: str + + BINARY: + PROVIDERS: List[, ...] + + ENTRYPOINTS: Dict[str, EntrypointConfig] + + + + WGET_BINARY: HostBinName = Field(default='wget') + + @computed_field + @property + def WGET_PROVIDERS(self) -> List[Provider]: + +class WGET_DEPENDENCY_CONFIG(DEPENDENCY_CONFIG): + pass + +class WGET_CONFIG(ModelWithDefaults): + EXTRACTORS: List[EXTRACTORS] = EXTRACTOR_CONFIG('') + DEPDENCIES: List[DEPENDENCY_CONFIG] = [DEPENDENCY_CONFIG] + +class WgetConfiguration(SingletonModel): + singleton_instance_id = 1 + + dependency_config: WGET_CONFIG = SchemaField() + extractor_config: WGET_CONFIG = SchemaField() + replay_config: WGET_CONFIG = SchemaField() + pkg_config: WGET_CONFIG = SchemaField() + + + + + +class WGET_CONFIG(ModelWithDefaults): + + + +# class ConfigSet(models.Model): +# # scope = when should this config set be active +# # host: on a specific host running archivebox +# # +# # snapshot__added: on or during a specific timeperiod +# # user: for actions initiated by a specific archivebox user +# # extractor: for specific extractors running under a snapshot +# # snapshot_id: for a specific snapshot pk +# # snapshot__url: for a specific snapshot url +# scope = models.CharField(choices=('host', 'date', 'user', 'extractor', 'domain', 'url', 'custom')) +# lookup = models.CharField(choices=('__eq', '__icontains', '__gte', '__lt', '__startswith', '__endswith', '__in', '__isnull')) +# match = models.CharField(max_length=128) + +# config = models.JSONField(default={}, schema=Dict[str, JSONValue]) +# getter = models.ImportString(default='django.utils.model_loading.import_string') + +# label = models.CharField(max_length=64) +# created_by = models.ForeignKey(User, on_delete=models.CASCADE) +# config = JSONField(schema=Dict[str, JSONValue]) + + + +CONFIG_SECTIONS = (GENERAL_CONFIG, SHELL_CONFIG) + +class USER_CONFIG(*CONFIG_SECTIONS): + pass + + +if __name__ == '__main__': + # print(ShellConfig(**{'IS_TTY': False, 'PGID': 911}).model_dump()) + # print(json.dumps(SHELL_CONFIG.model_json_schema(schema_generator=JSONSchemaWithLambdas), indent=4)) + # print(json.dumps(GENERAL_CONFIG.model_json_schema(schema_generator=JSONSchemaWithLambdas), indent=4)) + print() + # os.environ['PGID'] = '422' + os.environ['URL_ALLOWLIST'] = r'worked!!!!!\\.com' + config = USER_CONFIG(**{'SHOW_PROGRESS': False, 'ADMIN_USERNAME': 'kip', 'PGID': 911}) + + print('==========archivebox.config.CONFIG_SCHEMA======================') + print(json.dumps(config.as_legacy_schema(), indent=4, default=str)) + + print('==========JSON=================================================') + # print(config.__class__.__name__, '=', config.model_dump_json(indent=4)) + print(json.dumps(config.as_json(), indent=4)) + + print('==========TOML=================================================') + print(config.as_toml()) + +