move config into dedicated global app

This commit is contained in:
Nick Sweeting 2024-09-30 15:59:05 -07:00
parent ee7f73bd7b
commit 3e5b6ddeae
No known key found for this signature in database
79 changed files with 494 additions and 525 deletions

View file

@ -12,15 +12,14 @@ from urllib.parse import urlparse
from django.db.models import QuerySet, Q
import archivebox
from archivebox.config import DATA_DIR, CONSTANTS, SEARCH_BACKEND_CONFIG
from ..util import (
scheme,
enforce_types,
ExtendedEncoder,
)
from ..misc.logging import stderr
from ..config import (
from ..config.legacy import (
TIMEOUT,
URL_DENYLIST_PTN,
URL_ALLOWLIST_PTN,
@ -223,28 +222,28 @@ def timed_index_update(out_path: Path):
@enforce_types
def write_main_index(links: List[Link], out_dir: Path=archivebox.DATA_DIR, created_by_id: int | None=None) -> None:
def write_main_index(links: List[Link], out_dir: Path=DATA_DIR, created_by_id: int | None=None) -> None:
"""Writes links to sqlite3 file for a given list of links"""
log_indexing_process_started(len(links))
try:
with timed_index_update(archivebox.CONSTANTS.DATABASE_FILE):
with timed_index_update(CONSTANTS.DATABASE_FILE):
write_sql_main_index(links, out_dir=out_dir, created_by_id=created_by_id)
os.chmod(archivebox.CONSTANTS.DATABASE_FILE, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
os.chmod(CONSTANTS.DATABASE_FILE, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
except (KeyboardInterrupt, SystemExit):
stderr('[!] Warning: Still writing index to disk...', color='lightyellow')
stderr(' Run archivebox init to fix any inconsistencies from an ungraceful exit.')
with timed_index_update(archivebox.CONSTANTS.DATABASE_FILE):
with timed_index_update(CONSTANTS.DATABASE_FILE):
write_sql_main_index(links, out_dir=out_dir, created_by_id=created_by_id)
os.chmod(archivebox.CONSTANTS.DATABASE_FILE, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
os.chmod(CONSTANTS.DATABASE_FILE, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
raise SystemExit(0)
log_indexing_process_finished()
@enforce_types
def load_main_index(out_dir: Path=archivebox.DATA_DIR, warn: bool=True) -> List[Link]:
def load_main_index(out_dir: Path=DATA_DIR, warn: bool=True) -> List[Link]:
"""parse and load existing index with any new links from import_path merged in"""
from core.models import Snapshot
try:
@ -254,8 +253,8 @@ def load_main_index(out_dir: Path=archivebox.DATA_DIR, warn: bool=True) -> List[
raise SystemExit(0)
@enforce_types
def load_main_index_meta(out_dir: Path=archivebox.DATA_DIR) -> Optional[dict]:
index_path = out_dir / archivebox.CONSTANTS.JSON_INDEX_FILENAME
def load_main_index_meta(out_dir: Path=DATA_DIR) -> Optional[dict]:
index_path = out_dir / CONSTANTS.JSON_INDEX_FILENAME
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
meta_dict = pyjson.load(f)
@ -377,7 +376,6 @@ def q_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='
return snapshots.filter(q_filter)
def search_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='search') -> QuerySet:
from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
from ..search import query_search_index
if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
@ -406,7 +404,7 @@ def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type
return search_filter(snapshots, filter_patterns, filter_type)
def get_indexed_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_indexed_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""indexed links without checking archive status or data directory validity"""
links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
return {
@ -414,7 +412,7 @@ def get_indexed_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[st
for link in links
}
def get_archived_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_archived_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""indexed links that are archived with a valid data directory"""
links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
return {
@ -422,7 +420,7 @@ def get_archived_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[s
for link in filter(is_archived, links)
}
def get_unarchived_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_unarchived_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""indexed links that are unarchived with no data directory or an empty data directory"""
links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
return {
@ -430,12 +428,12 @@ def get_unarchived_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict
for link in filter(is_unarchived, links)
}
def get_present_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_present_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that actually exist in the archive/ folder"""
all_folders = {}
for entry in (out_dir / archivebox.CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
for entry in (out_dir / CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
link = None
try:
@ -447,7 +445,7 @@ def get_present_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[st
return all_folders
def get_valid_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_valid_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs with a valid index matched to the main index and archived content"""
links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator(chunk_size=500)]
return {
@ -455,7 +453,7 @@ def get_valid_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str,
for link in filter(is_valid, links)
}
def get_invalid_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_invalid_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
duplicate = get_duplicate_folders(snapshots, out_dir=out_dir)
orphaned = get_orphaned_folders(snapshots, out_dir=out_dir)
@ -464,7 +462,7 @@ def get_invalid_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[st
return {**duplicate, **orphaned, **corrupted, **unrecognized}
def get_duplicate_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_duplicate_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that conflict with other directories that have the same link URL or timestamp"""
by_url = {}
by_timestamp = {}
@ -472,7 +470,7 @@ def get_duplicate_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[
data_folders = (
str(entry)
for entry in archivebox.CONSTANTS.ARCHIVE_DIR.iterdir()
for entry in CONSTANTS.ARCHIVE_DIR.iterdir()
if entry.is_dir() and not snapshots.filter(timestamp=entry.name).exists()
)
@ -498,11 +496,11 @@ def get_duplicate_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[
duplicate_folders[path] = link
return duplicate_folders
def get_orphaned_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_orphaned_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that contain a valid index but aren't listed in the main index"""
orphaned_folders = {}
for entry in archivebox.CONSTANTS.ARCHIVE_DIR.iterdir():
for entry in CONSTANTS.ARCHIVE_DIR.iterdir():
if entry.is_dir():
link = None
try:
@ -516,7 +514,7 @@ def get_orphaned_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[s
return orphaned_folders
def get_corrupted_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_corrupted_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that don't contain a valid index and aren't listed in the main index"""
corrupted = {}
for snapshot in snapshots.iterator(chunk_size=500):
@ -525,11 +523,11 @@ def get_corrupted_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[
corrupted[link.link_dir] = link
return corrupted
def get_unrecognized_folders(snapshots, out_dir: Path=archivebox.DATA_DIR) -> Dict[str, Optional[Link]]:
def get_unrecognized_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
"""dirs that don't contain recognizable archive data and aren't listed in the main index"""
unrecognized_folders: Dict[str, Optional[Link]] = {}
for entry in (Path(out_dir) / archivebox.CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
for entry in (Path(out_dir) / CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
index_exists = (entry / "index.json").exists()
link = None
@ -594,10 +592,10 @@ def is_unarchived(link: Link) -> bool:
return not link.is_archived
def fix_invalid_folder_locations(out_dir: Path=archivebox.DATA_DIR) -> Tuple[List[str], List[str]]:
def fix_invalid_folder_locations(out_dir: Path=DATA_DIR) -> Tuple[List[str], List[str]]:
fixed = []
cant_fix = []
for entry in os.scandir(out_dir / archivebox.CONSTANTS.ARCHIVE_DIR_NAME):
for entry in os.scandir(out_dir / CONSTANTS.ARCHIVE_DIR_NAME):
if entry.is_dir(follow_symlinks=True):
if (Path(entry.path) / 'index.json').exists():
try:
@ -608,7 +606,7 @@ def fix_invalid_folder_locations(out_dir: Path=archivebox.DATA_DIR) -> Tuple[Lis
continue
if not entry.path.endswith(f'/{link.timestamp}'):
dest = out_dir /archivebox.CONSTANTS.ARCHIVE_DIR_NAME / link.timestamp
dest = out_dir /CONSTANTS.ARCHIVE_DIR_NAME / link.timestamp
if dest.exists():
cant_fix.append(entry.path)
else: