add filestore migrations

This commit is contained in:
Nick Sweeting 2024-12-12 22:15:01 -08:00
parent c11a1b54f1
commit 74e08a18aa
No known key found for this signature in database
2 changed files with 323 additions and 0 deletions

View file

@ -0,0 +1,99 @@
__package__ = 'archivebox.filestore'
import re
from pathlib import Path
from functools import wraps
from enum import Enum
import archivebox
from archivebox import CONSTANTS
from core.models import Snapshot
from .models import File
class FilestoreVersion(Enum):
v0_7_2 = 'v0.7.2'
v0_8_6 = 'v0.8.6'
v0_9_0 = 'v0.9.0'
LATEST_VERSION = FilestoreVersion.v0_9_0
def migration(src_ver: FilestoreVersion, dst_ver: FilestoreVersion, pattern: str, timeout_seconds: int = 600):
"""Decorator for a migration function that will only run on files that match the given pattern and are at the given version."""
def decorator(migration_func):
@wraps(migration_func)
def wrapper(file: File) -> None:
# skip if this migration doesn't apply to this file
if file.version != src_ver:
return None
if not re.match(pattern, file.file.name):
return None
# acquire lock, run migration + update version, then unlock
try:
file.acquire_lock(timeout_seconds)
migration_func(file)
file.version = dst_ver
except Exception as e:
# logger.error(f"Failed to migrate file {file.id}: {e}")
print(f"Failed to migrate file {file.id}: {e}")
file.version = src_ver # roll back version to original version
finally:
file.release_lock()
file.save()
wrapper.src_ver = src_ver # type: ignore
wrapper.dst_ver = dst_ver # type: ignore
wrapper.pattern = pattern # type: ignore
wrapper.timeout_seconds = timeout_seconds # type: ignore
return wrapper
return decorator
def detect_archiveresult(path: Path) -> 'ArchiveResult' | None:
# archive/1723423525.0/singlefile.html
timestamp = path.parts[1]
snapshot = Snapshot.objects.filter(timestamp=timestamp).last()
if not snapshot:
return
result = snapshot.archiveresult_set.filter(output=path.name).last()
if not result:
return
return result
# @hookimpl(hook_name='migrate_file')
@migration(FilestoreVersion.v0_7_2, FilestoreVersion.v0_8_6, r'archive/([0-9\.]+)/.+', timeout_seconds=600)
def migrate_v07_to_v08_singlefile(file: File) -> None:
result = detect_archiveresult(file.relpath)
new_path = result.OUTPUT_DIR / 'index.html'
file.move_to(new_path)
# @hookimpl(hook_name='migrate_file')
@migration(FilestoreVersion.v0_8_6, FilestoreVersion.v0_9_0, r'archive/([0-9\.]+)/singlefile.html', timeout_seconds=600)
def migrate_v08_to_v09_singlefile(file: File) -> None:
result = detect_archiveresult(file.relpath)
new_path = result.OUTPUT_DIR / 'index.html'
file.move_to(new_path)
def migrate_all_files(target=LATEST_VERSION, batch_size: int = 100):
File.release_expired_locks()
pending_files = (
File.objects
.filter(status='unlocked')
.exclude(version=target)
.iterator(chunk_size=batch_size)
)
for file in pending_files:
try:
archivebox.pm.hook.migrate_file(file=file)
except Exception as e:
print(f"Failed to migrate file {file.id}: {e}")

View file

@ -0,0 +1,224 @@
__package__ = 'archivebox.filestore'
import time
import os
from datetime import timedelta
from typing import ClassVar
from django.utils import timezone
from rich import print
from statemachine import State, StateMachine
from workers.actor import ActorType
from .models import File
class FileMachine(StateMachine, strict_states=True):
"""
State machine for managing File lifecycle.
https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
"""
model: File
MAX_LOCK_TIME: ClassVar[int] = 600
# States
unlocked = State(value=File.StatusChoices.UNLOCKED, initial=True)
locked = State(value=File.StatusChoices.LOCKED)
# Transition Events
lock = unlocked.to(locked, cond='can_lock')
unlock = locked.to(unlocked)
def __init__(self, file, *args, **kwargs):
self.file = file
super().__init__(file, *args, **kwargs)
def __repr__(self) -> str:
return f'[grey53]File\\[{self.file.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.file.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]'
def __str__(self) -> str:
return self.__repr__()
@locked.enter
def enter_locked(self):
print(f'{self}.on_locked() ↳ file.locked_at = now()')
self.file.lock_file(seconds=self.MAX_LOCK_TIME)
def can_lock(self) -> bool:
return self.file.status == File.StatusChoices.UNLOCKED
class FileWorker(ActorType[File]):
"""
The primary actor for progressing Snapshot objects
through their lifecycle using the SnapshotMachine.
"""
Model = File
StateMachineClass = FileMachine
ACTIVE_STATE: ClassVar[State] = FileMachine.locked
MAX_CONCURRENT_ACTORS: ClassVar[int] = 4
MAX_TICK_TIME: ClassVar[int] = 600
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
class ArchiveResultMachine(StateMachine, strict_states=True):
"""
State machine for managing ArchiveResult lifecycle.
https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
"""
model: ArchiveResult
# States
queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
started = State(value=ArchiveResult.StatusChoices.STARTED)
backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
# Tick Event
tick = (
queued.to.itself(unless='can_start') |
queued.to(started, cond='can_start') |
started.to.itself(unless='is_finished') |
started.to(succeeded, cond='is_succeeded') |
started.to(failed, cond='is_failed') |
started.to(backoff, cond='is_backoff') |
backoff.to.itself(unless='can_start') |
backoff.to(started, cond='can_start') |
backoff.to(succeeded, cond='is_succeeded') |
backoff.to(failed, cond='is_failed')
)
def __init__(self, archiveresult, *args, **kwargs):
self.archiveresult = archiveresult
super().__init__(archiveresult, *args, **kwargs)
def __repr__(self) -> str:
return f'[grey53]ArchiveResult\\[{self.archiveresult.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.archiveresult.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]'
def __str__(self) -> str:
return self.__repr__()
def can_start(self) -> bool:
can_start = bool(self.archiveresult.snapshot.url)
if not can_start:
print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue]: cant start yet +{timezone.now() - self.archiveresult.retry_at}s')
return can_start
def is_succeeded(self) -> bool:
if self.archiveresult.output and 'err' not in self.archiveresult.output.lower():
return True
return False
def is_failed(self) -> bool:
if self.archiveresult.output and 'err' in self.archiveresult.output.lower():
return True
return False
def is_backoff(self) -> bool:
if self.archiveresult.output is None:
return True
return False
def is_finished(self) -> bool:
return self.is_failed() or self.is_succeeded()
@queued.enter
def enter_queued(self):
print(f'{self}.on_queued() ↳ archiveresult.retry_at = now()')
self.archiveresult.update_for_workers(
retry_at=timezone.now(),
status=ArchiveResult.StatusChoices.QUEUED,
start_ts=None,
) # bump the snapshot's retry_at so they pickup any new changes
@started.enter
def enter_started(self):
print(f'{self}.on_started() ↳ archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
# lock the object for the next 30sec
self.archiveresult.update_for_workers(
retry_at=timezone.now() + timedelta(seconds=30),
status=ArchiveResult.StatusChoices.QUEUED,
start_ts=timezone.now(),
) # lock the obj for the next ~30s to limit racing with other workers
# run_subcommand([
# 'archivebox', 'extract', self.archiveresult.ABID,
# ])
# create the output directory and fork the new extractor job subprocess
self.archiveresult.create_output_dir()
# self.archiveresult.extract(background=True)
# mark the object as started
self.archiveresult.update_for_workers(
retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails
status=ArchiveResult.StatusChoices.STARTED,
)
# simulate slow running extractor that completes after 2 seconds
time.sleep(2)
self.archiveresult.update_for_workers(output='completed')
@backoff.enter
def enter_backoff(self):
print(f'{self}.on_backoff() ↳ archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None')
self.archiveresult.update_for_workers(
retry_at=timezone.now() + timedelta(seconds=60),
status=ArchiveResult.StatusChoices.BACKOFF,
end_ts=None,
# retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1,
)
self.archiveresult.save(write_indexes=True)
@succeeded.enter
def enter_succeeded(self):
print(f'{self}.on_succeeded() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()')
self.archiveresult.update_for_workers(
retry_at=None,
status=ArchiveResult.StatusChoices.SUCCEEDED,
end_ts=timezone.now(),
# **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
)
self.archiveresult.save(write_indexes=True)
@failed.enter
def enter_failed(self):
print(f'{self}.on_failed() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()')
self.archiveresult.update_for_workers(
retry_at=None,
status=ArchiveResult.StatusChoices.FAILED,
end_ts=timezone.now(),
# **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
)
def after_transition(self, event: str, source: State, target: State):
# print(f"after '{event}' from '{source.id}' to '{target.id}'")
self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes
class ArchiveResultWorker(ActorType[ArchiveResult]):
"""
The primary actor for progressing ArchiveResult objects
through their lifecycle using the ArchiveResultMachine.
"""
Model = ArchiveResult
StateMachineClass = ArchiveResultMachine
ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
MAX_TICK_TIME: ClassVar[int] = 60
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10