mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2025-05-16 16:14:28 -04:00
add pydantic_settings mockup
This commit is contained in:
parent
3b0a25950d
commit
b611c0114c
1 changed files with 336 additions and 0 deletions
336
archivebox/plugantic/settings.py
Normal file
336
archivebox/plugantic/settings.py
Normal file
|
@ -0,0 +1,336 @@
|
|||
import re
|
||||
import os
|
||||
import sys
|
||||
import toml
|
||||
import json
|
||||
import platform
|
||||
import inspect
|
||||
import tomllib
|
||||
|
||||
from typing import Callable, Any, Optional, Pattern, Type, Tuple, Dict, List
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel, Field, FieldValidationInfo, AliasChoices, model_validator, FilePath, DirectoryPath, computed_field, TypeAdapter
|
||||
from pydantic.fields import FieldInfo
|
||||
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict, PydanticBaseSettingsSource
|
||||
from pydantic_settings.sources import InitSettingsSource, ConfigFileSourceMixin, TomlConfigSettingsSource
|
||||
|
||||
from pydantic.json_schema import GenerateJsonSchema
|
||||
from pydantic_core import PydanticOmit, core_schema, to_jsonable_python, ValidationError
|
||||
from pydantic.json_schema import GenerateJsonSchema, JsonSchemaValue
|
||||
|
||||
import ini_to_toml
|
||||
|
||||
|
||||
class JSONSchemaWithLambdas(GenerateJsonSchema):
|
||||
def encode_default(self, default: Any) -> Any:
|
||||
"""Encode lambda functions in default values properly"""
|
||||
config = self._config
|
||||
if isinstance(default, Callable):
|
||||
return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}'
|
||||
return to_jsonable_python(
|
||||
default,
|
||||
timedelta_mode=config.ser_json_timedelta,
|
||||
bytes_mode=config.ser_json_bytes,
|
||||
serialize_unknown=True
|
||||
)
|
||||
|
||||
# for computed_field properties render them like this instead:
|
||||
# inspect.getsource(field.wrapped_property.fget).split('def ', 1)[-1].split('\n', 1)[-1].strip().strip('return '),
|
||||
|
||||
|
||||
class ModelWithDefaults(BaseSettings):
|
||||
model_config = SettingsConfigDict(validate_default=False, case_sensitive=False, extra='ignore')
|
||||
|
||||
@model_validator(mode='after')
|
||||
def fill_defaults(self):
|
||||
"""Populate any unset values using function provided as their default"""
|
||||
for key, field in self.model_fields.items():
|
||||
value = getattr(self, key)
|
||||
if isinstance(value, Callable):
|
||||
# if value is a function, execute it to get the actual value, passing CONFIG dict as an arg
|
||||
config_so_far = self.dict(exclude_unset=True)
|
||||
fallback_value = field.default(config_so_far)
|
||||
|
||||
# check to make sure default factory return value matches type annotation
|
||||
TypeAdapter(field.annotation).validate_python(fallback_value)
|
||||
|
||||
# set generated default value as final validated value
|
||||
setattr(self, key, fallback_value)
|
||||
return self
|
||||
|
||||
def as_json(self, model_fields=True, computed_fields=True):
|
||||
output_dict = {}
|
||||
for section in self.__class__.__mro__[1:]:
|
||||
if not section.__name__.isupper():
|
||||
break
|
||||
output_dict[section.__name__] = output_dict.get(section.__name__) or {}
|
||||
include = {}
|
||||
if model_fields:
|
||||
include.update(**section.model_fields)
|
||||
if computed_fields:
|
||||
include.update(**section.model_computed_fields)
|
||||
output_dict[section.__name__].update(json.loads(section.json(self, include=include)))
|
||||
return output_dict
|
||||
|
||||
def as_toml(self, model_fields=True, computed_fields=True):
|
||||
output_text = ''
|
||||
for section in self.__class__.__mro__[1:]:
|
||||
if not section.__name__.isupper():
|
||||
break
|
||||
include = {}
|
||||
if model_fields:
|
||||
include.update(**section.model_fields)
|
||||
if computed_fields:
|
||||
include.update(**section.model_computed_fields)
|
||||
|
||||
output_text += (
|
||||
f'[{section.__name__}]\n' +
|
||||
toml.dumps(json.loads(section.json(self, include=include))) + '\n'
|
||||
)
|
||||
return output_text
|
||||
|
||||
def as_legacy_schema(self, model_fields=True, computed_fields=True):
|
||||
"""Convert a newer Pydantic Settings BaseModel into the old-style archivebox.config CONFIG_SCHEMA format"""
|
||||
|
||||
schemas = {}
|
||||
|
||||
include = {}
|
||||
if model_fields:
|
||||
include.update(**self.model_fields)
|
||||
if computed_fields:
|
||||
include.update(**self.model_computed_fields)
|
||||
|
||||
for key, field in include.items():
|
||||
key = key.upper()
|
||||
defining_class = None
|
||||
for cls in self.__class__.__mro__[1:]:
|
||||
if key in cls.model_fields or key in cls.model_computed_fields:
|
||||
defining_class = cls
|
||||
break
|
||||
|
||||
assert defining_class is not None, f"No defining class found for field {key}! (should be impossible)"
|
||||
|
||||
schemas[defining_class.__name__] = schemas.get(defining_class.__name__) or {}
|
||||
schemas[defining_class.__name__][key] = {
|
||||
'value': getattr(self, key),
|
||||
'type': str(field.annotation.__name__).lower() if hasattr(field, 'annotation') else str(field.return_type).lower(),
|
||||
'default': field.default if hasattr(field, 'default') else field.wrapped_property.fget,
|
||||
'aliases': (getattr(field.json_schema_extra.get('aliases', {}), 'choices') or []) if getattr(field, 'json_schema_extra') else [],
|
||||
}
|
||||
|
||||
return schemas
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: Type[BaseSettings],
|
||||
init_settings: PydanticBaseSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> Tuple[PydanticBaseSettingsSource, ...]:
|
||||
ARCHIVEBOX_CONFIG_FILE = Path('/Users/squash/Local/Code/archiveboxes/ArchiveBox/data/ArchiveBox.conf')
|
||||
ARCHIVEBOX_CONFIG_FILE_TOML = ARCHIVEBOX_CONFIG_FILE.parent / f'.ArchiveBox.toml'
|
||||
try:
|
||||
return (
|
||||
init_settings,
|
||||
env_settings,
|
||||
TomlConfigSettingsSource(settings_cls, toml_file=ARCHIVEBOX_CONFIG_FILE),
|
||||
)
|
||||
except tomllib.TOMLDecodeError:
|
||||
toml_str = ini_to_toml.convert(ARCHIVEBOX_CONFIG_FILE.read_text())
|
||||
with open(ARCHIVEBOX_CONFIG_FILE_TOML, 'w+') as f:
|
||||
f.write(toml_str)
|
||||
|
||||
return (
|
||||
init_settings,
|
||||
env_settings,
|
||||
TomlConfigSettingsSource(settings_cls, toml_file=ARCHIVEBOX_CONFIG_FILE_TOML),
|
||||
)
|
||||
|
||||
|
||||
class SHELL_CONFIG(ModelWithDefaults):
|
||||
IS_TTY: bool = Field(default=lambda c: sys.stdout.isatty())
|
||||
USE_COLOR: bool = Field(default=lambda c: c['IS_TTY'])
|
||||
SHOW_PROGRESS: bool = Field(default=lambda c: c['IS_TTY'] and (platform.system() != 'Darwin'))
|
||||
|
||||
IN_DOCKER: bool = Field(default=False)
|
||||
IN_QEMU: bool = Field(default=False)
|
||||
PUID: int = Field(default=lambda c: os.getuid())
|
||||
PGID: int = Field(default=lambda c: os.getgid())
|
||||
|
||||
|
||||
class GENERAL_CONFIG(ModelWithDefaults):
|
||||
# OUTPUT_DIR: DirectoryPath
|
||||
CONFIG_FILE: FilePath = Field(default=lambda c: c['OUTPUT_DIR'] / 'ArchiveBox.conf')
|
||||
|
||||
ONLY_NEW: bool = Field(default=True)
|
||||
TIMEOUT: int = Field(default=60)
|
||||
MEDIA_TIMEOUT: int = Field(default=3600)
|
||||
|
||||
ENFORCE_ATOMIC_WRITES: bool = Field(default=True)
|
||||
OUTPUT_PERMISSIONS: str = Field(default='644')
|
||||
RESTRICT_FILE_NAMES: str = Field(default='windows')
|
||||
|
||||
URL_DENYLIST: Pattern = Field(default=re.compile(r'\.(css|js|otf|ttf|woff|woff2|gstatic\.com|googleapis\.com/css)(\?.*)?$'), aliases=AliasChoices('URL_BLACKLIST'))
|
||||
URL_ALLOWLIST: Pattern = Field(default=re.compile(r''), aliases=AliasChoices('URL_WHITELIST'))
|
||||
|
||||
ADMIN_USERNAME: Optional[str] = Field(default=None, min_length=1, max_length=63, pattern=r'^[\S]+$')
|
||||
ADMIN_PASSWORD: Optional[str] = Field(default=None, min_length=1, max_length=63)
|
||||
|
||||
TAG_SEPARATOR_PATTERN: Pattern = Field(default=re.compile(r'[,]'))
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def OUTPUT_DIR(self) -> DirectoryPath:
|
||||
return Path('.').resolve()
|
||||
|
||||
# class PackageInstalled(ModelWithDefaults):
|
||||
# binary_abs: HostBinPathStr
|
||||
# version_str: str
|
||||
# is_valid: True
|
||||
# provider: PackageProvider
|
||||
# date_installed: datetime
|
||||
# date_checked: datetime
|
||||
|
||||
class EntrypointConfig(ModelWithDefaults):
|
||||
name: str
|
||||
dependency: str
|
||||
runtime: Literal['python.eval', 'node.eval', 'puppeteer', 'shell.run', 'ansible']
|
||||
CMD: str
|
||||
DEFAULT_ARGS: List[str]
|
||||
EXTRA_ARGS: List[str]
|
||||
ARGS: List[str]
|
||||
SCHEMA: EntrypointSchema
|
||||
validator: Callable = eval
|
||||
|
||||
class VersionEntrypointConfig(ModelWithDefaults):
|
||||
DEFAULT_ARGS = ['--version']
|
||||
|
||||
class PackageProvider(ModelWithDefaults):
|
||||
name: Literal['config', 'PATH', 'pip', 'apt', 'brew', 'npm', 'vendor']
|
||||
|
||||
def install_bin(self, name):
|
||||
# ...
|
||||
return PackageInstall
|
||||
|
||||
def get_bin_path(self, name, install=True):
|
||||
return shell(['which', name])
|
||||
|
||||
class DependencyConfig(ModelWithDefaults):
|
||||
providers: List[Literal['config', 'PATH', 'pip', 'apt', 'brew', 'npm', 'vendor'], ...]
|
||||
name: str
|
||||
packages: List[str]
|
||||
entrypoints: Dict[str, EntrypointConfig]
|
||||
version_cmd: EntrypointConfig = field(default=lambda c: )
|
||||
|
||||
class ExtractorConfig(ModelWithDefaults):
|
||||
name: str
|
||||
description: str = Field(examples=['WGET Extractor'])
|
||||
depends_on: DepencencyConfig
|
||||
entrypoint: EntrypointConfig = Field(description='Which entrypoint to use for this extractor')
|
||||
|
||||
class ReplayerConfig(ModelWithDefaults):
|
||||
"""Describes how to render an ArchiveResult in several contexts"""
|
||||
name: str
|
||||
row_template: 'plugins/wget/templates/row.html'
|
||||
embed_template: 'plugins/wget/templates/embed.html'
|
||||
fullpage_template: 'plugins/wget/templates/fullpage.html'
|
||||
|
||||
icon_view: ImportString 'plugins.wget.replayers.wget.get_icon'
|
||||
thumbnail_getter: ImportString = 'plugins.wget.replayers.wget.get_icon'
|
||||
|
||||
class PluginConfig(ModelWithDefaults):
|
||||
dependencies: Dict[str, DependencyConfig]
|
||||
entrypoints: Dict[str, EntrypointConfig]
|
||||
extractors: Dict[str, ExtractorConfig]
|
||||
replayers: Dict[str, ReplayerConfig]
|
||||
|
||||
name: str
|
||||
|
||||
BINARY:
|
||||
PROVIDERS: List[, ...]
|
||||
|
||||
ENTRYPOINTS: Dict[str, EntrypointConfig]
|
||||
|
||||
|
||||
|
||||
WGET_BINARY: HostBinName = Field(default='wget')
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def WGET_PROVIDERS(self) -> List[Provider]:
|
||||
|
||||
class WGET_DEPENDENCY_CONFIG(DEPENDENCY_CONFIG):
|
||||
pass
|
||||
|
||||
class WGET_CONFIG(ModelWithDefaults):
|
||||
EXTRACTORS: List[EXTRACTORS] = EXTRACTOR_CONFIG('')
|
||||
DEPDENCIES: List[DEPENDENCY_CONFIG] = [DEPENDENCY_CONFIG]
|
||||
|
||||
class WgetConfiguration(SingletonModel):
|
||||
singleton_instance_id = 1
|
||||
|
||||
dependency_config: WGET_CONFIG = SchemaField()
|
||||
extractor_config: WGET_CONFIG = SchemaField()
|
||||
replay_config: WGET_CONFIG = SchemaField()
|
||||
pkg_config: WGET_CONFIG = SchemaField()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class WGET_CONFIG(ModelWithDefaults):
|
||||
|
||||
|
||||
|
||||
# class ConfigSet(models.Model):
|
||||
# # scope = when should this config set be active
|
||||
# # host: on a specific host running archivebox
|
||||
# #
|
||||
# # snapshot__added: on or during a specific timeperiod
|
||||
# # user: for actions initiated by a specific archivebox user
|
||||
# # extractor: for specific extractors running under a snapshot
|
||||
# # snapshot_id: for a specific snapshot pk
|
||||
# # snapshot__url: for a specific snapshot url
|
||||
# scope = models.CharField(choices=('host', 'date', 'user', 'extractor', 'domain', 'url', 'custom'))
|
||||
# lookup = models.CharField(choices=('__eq', '__icontains', '__gte', '__lt', '__startswith', '__endswith', '__in', '__isnull'))
|
||||
# match = models.CharField(max_length=128)
|
||||
|
||||
# config = models.JSONField(default={}, schema=Dict[str, JSONValue])
|
||||
# getter = models.ImportString(default='django.utils.model_loading.import_string')
|
||||
|
||||
# label = models.CharField(max_length=64)
|
||||
# created_by = models.ForeignKey(User, on_delete=models.CASCADE)
|
||||
# config = JSONField(schema=Dict[str, JSONValue])
|
||||
|
||||
|
||||
|
||||
CONFIG_SECTIONS = (GENERAL_CONFIG, SHELL_CONFIG)
|
||||
|
||||
class USER_CONFIG(*CONFIG_SECTIONS):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# print(ShellConfig(**{'IS_TTY': False, 'PGID': 911}).model_dump())
|
||||
# print(json.dumps(SHELL_CONFIG.model_json_schema(schema_generator=JSONSchemaWithLambdas), indent=4))
|
||||
# print(json.dumps(GENERAL_CONFIG.model_json_schema(schema_generator=JSONSchemaWithLambdas), indent=4))
|
||||
print()
|
||||
# os.environ['PGID'] = '422'
|
||||
os.environ['URL_ALLOWLIST'] = r'worked!!!!!\\.com'
|
||||
config = USER_CONFIG(**{'SHOW_PROGRESS': False, 'ADMIN_USERNAME': 'kip', 'PGID': 911})
|
||||
|
||||
print('==========archivebox.config.CONFIG_SCHEMA======================')
|
||||
print(json.dumps(config.as_legacy_schema(), indent=4, default=str))
|
||||
|
||||
print('==========JSON=================================================')
|
||||
# print(config.__class__.__name__, '=', config.model_dump_json(indent=4))
|
||||
print(json.dumps(config.as_json(), indent=4))
|
||||
|
||||
print('==========TOML=================================================')
|
||||
print(config.as_toml())
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue