From 74e08a18aa55851e6975132c67a09e21441f8d8a Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Thu, 12 Dec 2024 22:15:01 -0800 Subject: [PATCH] add filestore migrations --- archivebox/filestore/file_migrations.py | 99 +++++++++++ archivebox/filestore/statemachines.py | 224 ++++++++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 archivebox/filestore/file_migrations.py create mode 100644 archivebox/filestore/statemachines.py diff --git a/archivebox/filestore/file_migrations.py b/archivebox/filestore/file_migrations.py new file mode 100644 index 00000000..2753dfc1 --- /dev/null +++ b/archivebox/filestore/file_migrations.py @@ -0,0 +1,99 @@ +__package__ = 'archivebox.filestore' + +import re +from pathlib import Path +from functools import wraps +from enum import Enum + + +import archivebox +from archivebox import CONSTANTS + +from core.models import Snapshot +from .models import File + + +class FilestoreVersion(Enum): + v0_7_2 = 'v0.7.2' + v0_8_6 = 'v0.8.6' + v0_9_0 = 'v0.9.0' + +LATEST_VERSION = FilestoreVersion.v0_9_0 + + +def migration(src_ver: FilestoreVersion, dst_ver: FilestoreVersion, pattern: str, timeout_seconds: int = 600): + """Decorator for a migration function that will only run on files that match the given pattern and are at the given version.""" + def decorator(migration_func): + @wraps(migration_func) + def wrapper(file: File) -> None: + # skip if this migration doesn't apply to this file + if file.version != src_ver: + return None + if not re.match(pattern, file.file.name): + return None + + # acquire lock, run migration + update version, then unlock + try: + file.acquire_lock(timeout_seconds) + migration_func(file) + file.version = dst_ver + except Exception as e: + # logger.error(f"Failed to migrate file {file.id}: {e}") + print(f"Failed to migrate file {file.id}: {e}") + file.version = src_ver # roll back version to original version + finally: + file.release_lock() + file.save() + + wrapper.src_ver = src_ver # type: ignore + wrapper.dst_ver = dst_ver # type: ignore + wrapper.pattern = pattern # type: ignore + wrapper.timeout_seconds = timeout_seconds # type: ignore + return wrapper + return decorator + +def detect_archiveresult(path: Path) -> 'ArchiveResult' | None: + # archive/1723423525.0/singlefile.html + timestamp = path.parts[1] + snapshot = Snapshot.objects.filter(timestamp=timestamp).last() + if not snapshot: + return + + result = snapshot.archiveresult_set.filter(output=path.name).last() + if not result: + return + return result + + +# @hookimpl(hook_name='migrate_file') +@migration(FilestoreVersion.v0_7_2, FilestoreVersion.v0_8_6, r'archive/([0-9\.]+)/.+', timeout_seconds=600) +def migrate_v07_to_v08_singlefile(file: File) -> None: + result = detect_archiveresult(file.relpath) + new_path = result.OUTPUT_DIR / 'index.html' + file.move_to(new_path) + +# @hookimpl(hook_name='migrate_file') +@migration(FilestoreVersion.v0_8_6, FilestoreVersion.v0_9_0, r'archive/([0-9\.]+)/singlefile.html', timeout_seconds=600) +def migrate_v08_to_v09_singlefile(file: File) -> None: + result = detect_archiveresult(file.relpath) + new_path = result.OUTPUT_DIR / 'index.html' + file.move_to(new_path) + + + + +def migrate_all_files(target=LATEST_VERSION, batch_size: int = 100): + File.release_expired_locks() + + pending_files = ( + File.objects + .filter(status='unlocked') + .exclude(version=target) + .iterator(chunk_size=batch_size) + ) + + for file in pending_files: + try: + archivebox.pm.hook.migrate_file(file=file) + except Exception as e: + print(f"Failed to migrate file {file.id}: {e}") diff --git a/archivebox/filestore/statemachines.py b/archivebox/filestore/statemachines.py new file mode 100644 index 00000000..2903276a --- /dev/null +++ b/archivebox/filestore/statemachines.py @@ -0,0 +1,224 @@ +__package__ = 'archivebox.filestore' + +import time +import os +from datetime import timedelta +from typing import ClassVar + +from django.utils import timezone + +from rich import print + +from statemachine import State, StateMachine + +from workers.actor import ActorType + +from .models import File + +class FileMachine(StateMachine, strict_states=True): + """ + State machine for managing File lifecycle. + + https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams + """ + + model: File + MAX_LOCK_TIME: ClassVar[int] = 600 + + # States + unlocked = State(value=File.StatusChoices.UNLOCKED, initial=True) + locked = State(value=File.StatusChoices.LOCKED) + + # Transition Events + lock = unlocked.to(locked, cond='can_lock') + unlock = locked.to(unlocked) + + + def __init__(self, file, *args, **kwargs): + self.file = file + super().__init__(file, *args, **kwargs) + + def __repr__(self) -> str: + return f'[grey53]File\\[{self.file.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.file.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' + + def __str__(self) -> str: + return self.__repr__() + + @locked.enter + def enter_locked(self): + print(f'{self}.on_locked() ↳ file.locked_at = now()') + self.file.lock_file(seconds=self.MAX_LOCK_TIME) + + def can_lock(self) -> bool: + return self.file.status == File.StatusChoices.UNLOCKED + + +class FileWorker(ActorType[File]): + """ + The primary actor for progressing Snapshot objects + through their lifecycle using the SnapshotMachine. + """ + Model = File + StateMachineClass = FileMachine + + ACTIVE_STATE: ClassVar[State] = FileMachine.locked + + MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 + MAX_TICK_TIME: ClassVar[int] = 600 + CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 + + + + + +class ArchiveResultMachine(StateMachine, strict_states=True): + """ + State machine for managing ArchiveResult lifecycle. + + https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams + """ + + model: ArchiveResult + + # States + queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True) + started = State(value=ArchiveResult.StatusChoices.STARTED) + backoff = State(value=ArchiveResult.StatusChoices.BACKOFF) + succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True) + failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True) + + # Tick Event + tick = ( + queued.to.itself(unless='can_start') | + queued.to(started, cond='can_start') | + started.to.itself(unless='is_finished') | + started.to(succeeded, cond='is_succeeded') | + started.to(failed, cond='is_failed') | + started.to(backoff, cond='is_backoff') | + backoff.to.itself(unless='can_start') | + backoff.to(started, cond='can_start') | + backoff.to(succeeded, cond='is_succeeded') | + backoff.to(failed, cond='is_failed') + ) + + def __init__(self, archiveresult, *args, **kwargs): + self.archiveresult = archiveresult + super().__init__(archiveresult, *args, **kwargs) + + def __repr__(self) -> str: + return f'[grey53]ArchiveResult\\[{self.archiveresult.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.archiveresult.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' + + def __str__(self) -> str: + return self.__repr__() + + def can_start(self) -> bool: + can_start = bool(self.archiveresult.snapshot.url) + if not can_start: + print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue]: cant start yet +{timezone.now() - self.archiveresult.retry_at}s') + return can_start + + def is_succeeded(self) -> bool: + if self.archiveresult.output and 'err' not in self.archiveresult.output.lower(): + return True + return False + + def is_failed(self) -> bool: + if self.archiveresult.output and 'err' in self.archiveresult.output.lower(): + return True + return False + + def is_backoff(self) -> bool: + if self.archiveresult.output is None: + return True + return False + + def is_finished(self) -> bool: + return self.is_failed() or self.is_succeeded() + + @queued.enter + def enter_queued(self): + print(f'{self}.on_queued() ↳ archiveresult.retry_at = now()') + self.archiveresult.update_for_workers( + retry_at=timezone.now(), + status=ArchiveResult.StatusChoices.QUEUED, + start_ts=None, + ) # bump the snapshot's retry_at so they pickup any new changes + + @started.enter + def enter_started(self): + print(f'{self}.on_started() ↳ archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') + # lock the object for the next 30sec + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=30), + status=ArchiveResult.StatusChoices.QUEUED, + start_ts=timezone.now(), + ) # lock the obj for the next ~30s to limit racing with other workers + + # run_subcommand([ + # 'archivebox', 'extract', self.archiveresult.ABID, + # ]) + + # create the output directory and fork the new extractor job subprocess + self.archiveresult.create_output_dir() + # self.archiveresult.extract(background=True) + + # mark the object as started + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails + status=ArchiveResult.StatusChoices.STARTED, + ) + + # simulate slow running extractor that completes after 2 seconds + time.sleep(2) + self.archiveresult.update_for_workers(output='completed') + + @backoff.enter + def enter_backoff(self): + print(f'{self}.on_backoff() ↳ archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=60), + status=ArchiveResult.StatusChoices.BACKOFF, + end_ts=None, + # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1, + ) + self.archiveresult.save(write_indexes=True) + + @succeeded.enter + def enter_succeeded(self): + print(f'{self}.on_succeeded() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') + self.archiveresult.update_for_workers( + retry_at=None, + status=ArchiveResult.StatusChoices.SUCCEEDED, + end_ts=timezone.now(), + # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} + ) + self.archiveresult.save(write_indexes=True) + + @failed.enter + def enter_failed(self): + print(f'{self}.on_failed() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') + self.archiveresult.update_for_workers( + retry_at=None, + status=ArchiveResult.StatusChoices.FAILED, + end_ts=timezone.now(), + # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} + ) + + def after_transition(self, event: str, source: State, target: State): + # print(f"after '{event}' from '{source.id}' to '{target.id}'") + self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes + + +class ArchiveResultWorker(ActorType[ArchiveResult]): + """ + The primary actor for progressing ArchiveResult objects + through their lifecycle using the ArchiveResultMachine. + """ + Model = ArchiveResult + StateMachineClass = ArchiveResultMachine + + ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' + + MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 + MAX_TICK_TIME: ClassVar[int] = 60 + CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10