diff --git a/archivebox/filestore/statemachines.py b/archivebox/filestore/statemachines.py index 2903276a..e4514964 100644 --- a/archivebox/filestore/statemachines.py +++ b/archivebox/filestore/statemachines.py @@ -1,224 +1,67 @@ -__package__ = 'archivebox.filestore' +# __package__ = 'archivebox.filestore' -import time -import os -from datetime import timedelta -from typing import ClassVar +# import time +# import os +# from datetime import timedelta +# from typing import ClassVar -from django.utils import timezone +# from django.utils import timezone -from rich import print +# from rich import print -from statemachine import State, StateMachine +# from statemachine import State, StateMachine -from workers.actor import ActorType +# from workers.actor import ActorType -from .models import File +# from .models import File -class FileMachine(StateMachine, strict_states=True): - """ - State machine for managing File lifecycle. +# class FileMachine(StateMachine, strict_states=True): +# """ +# State machine for managing File lifecycle. - https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams - """ +# https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams +# """ - model: File - MAX_LOCK_TIME: ClassVar[int] = 600 +# model: File +# MAX_LOCK_TIME: ClassVar[int] = 600 - # States - unlocked = State(value=File.StatusChoices.UNLOCKED, initial=True) - locked = State(value=File.StatusChoices.LOCKED) +# # States +# unlocked = State(value=File.StatusChoices.UNLOCKED, initial=True) +# locked = State(value=File.StatusChoices.LOCKED) - # Transition Events - lock = unlocked.to(locked, cond='can_lock') - unlock = locked.to(unlocked) +# # Transition Events +# lock = unlocked.to(locked, cond='can_lock') +# unlock = locked.to(unlocked) - def __init__(self, file, *args, **kwargs): - self.file = file - super().__init__(file, *args, **kwargs) +# def __init__(self, file, *args, **kwargs): +# self.file = file +# super().__init__(file, *args, **kwargs) - def __repr__(self) -> str: - return f'[grey53]File\\[{self.file.ABID}] 🏃♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.file.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' +# def __repr__(self) -> str: +# return f'[grey53]File\\[{self.file.ABID}] 🏃♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.file.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' - def __str__(self) -> str: - return self.__repr__() +# def __str__(self) -> str: +# return self.__repr__() - @locked.enter - def enter_locked(self): - print(f'{self}.on_locked() ↳ file.locked_at = now()') - self.file.lock_file(seconds=self.MAX_LOCK_TIME) +# @locked.enter +# def enter_locked(self): +# print(f'{self}.on_locked() ↳ file.locked_at = now()') +# self.file.lock_file(seconds=self.MAX_LOCK_TIME) - def can_lock(self) -> bool: - return self.file.status == File.StatusChoices.UNLOCKED +# def can_lock(self) -> bool: +# return self.file.status == File.StatusChoices.UNLOCKED -class FileWorker(ActorType[File]): - """ - The primary actor for progressing Snapshot objects - through their lifecycle using the SnapshotMachine. - """ - Model = File - StateMachineClass = FileMachine +# class FileWorker(ActorType[File]): +# Model = File +# StateMachineClass = FileMachine - ACTIVE_STATE: ClassVar[State] = FileMachine.locked +# ACTIVE_STATE: ClassVar[State] = FileMachine.locked - MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 - MAX_TICK_TIME: ClassVar[int] = 600 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 +# MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 +# MAX_TICK_TIME: ClassVar[int] = 600 +# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 - - -class ArchiveResultMachine(StateMachine, strict_states=True): - """ - State machine for managing ArchiveResult lifecycle. - - https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams - """ - - model: ArchiveResult - - # States - queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True) - started = State(value=ArchiveResult.StatusChoices.STARTED) - backoff = State(value=ArchiveResult.StatusChoices.BACKOFF) - succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True) - failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True) - - # Tick Event - tick = ( - queued.to.itself(unless='can_start') | - queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished') | - started.to(succeeded, cond='is_succeeded') | - started.to(failed, cond='is_failed') | - started.to(backoff, cond='is_backoff') | - backoff.to.itself(unless='can_start') | - backoff.to(started, cond='can_start') | - backoff.to(succeeded, cond='is_succeeded') | - backoff.to(failed, cond='is_failed') - ) - - def __init__(self, archiveresult, *args, **kwargs): - self.archiveresult = archiveresult - super().__init__(archiveresult, *args, **kwargs) - - def __repr__(self) -> str: - return f'[grey53]ArchiveResult\\[{self.archiveresult.ABID}] 🏃♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.archiveresult.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' - - def __str__(self) -> str: - return self.__repr__() - - def can_start(self) -> bool: - can_start = bool(self.archiveresult.snapshot.url) - if not can_start: - print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue]: cant start yet +{timezone.now() - self.archiveresult.retry_at}s') - return can_start - - def is_succeeded(self) -> bool: - if self.archiveresult.output and 'err' not in self.archiveresult.output.lower(): - return True - return False - - def is_failed(self) -> bool: - if self.archiveresult.output and 'err' in self.archiveresult.output.lower(): - return True - return False - - def is_backoff(self) -> bool: - if self.archiveresult.output is None: - return True - return False - - def is_finished(self) -> bool: - return self.is_failed() or self.is_succeeded() - - @queued.enter - def enter_queued(self): - print(f'{self}.on_queued() ↳ archiveresult.retry_at = now()') - self.archiveresult.update_for_workers( - retry_at=timezone.now(), - status=ArchiveResult.StatusChoices.QUEUED, - start_ts=None, - ) # bump the snapshot's retry_at so they pickup any new changes - - @started.enter - def enter_started(self): - print(f'{self}.on_started() ↳ archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') - # lock the object for the next 30sec - self.archiveresult.update_for_workers( - retry_at=timezone.now() + timedelta(seconds=30), - status=ArchiveResult.StatusChoices.QUEUED, - start_ts=timezone.now(), - ) # lock the obj for the next ~30s to limit racing with other workers - - # run_subcommand([ - # 'archivebox', 'extract', self.archiveresult.ABID, - # ]) - - # create the output directory and fork the new extractor job subprocess - self.archiveresult.create_output_dir() - # self.archiveresult.extract(background=True) - - # mark the object as started - self.archiveresult.update_for_workers( - retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails - status=ArchiveResult.StatusChoices.STARTED, - ) - - # simulate slow running extractor that completes after 2 seconds - time.sleep(2) - self.archiveresult.update_for_workers(output='completed') - - @backoff.enter - def enter_backoff(self): - print(f'{self}.on_backoff() ↳ archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') - self.archiveresult.update_for_workers( - retry_at=timezone.now() + timedelta(seconds=60), - status=ArchiveResult.StatusChoices.BACKOFF, - end_ts=None, - # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1, - ) - self.archiveresult.save(write_indexes=True) - - @succeeded.enter - def enter_succeeded(self): - print(f'{self}.on_succeeded() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') - self.archiveresult.update_for_workers( - retry_at=None, - status=ArchiveResult.StatusChoices.SUCCEEDED, - end_ts=timezone.now(), - # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} - ) - self.archiveresult.save(write_indexes=True) - - @failed.enter - def enter_failed(self): - print(f'{self}.on_failed() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') - self.archiveresult.update_for_workers( - retry_at=None, - status=ArchiveResult.StatusChoices.FAILED, - end_ts=timezone.now(), - # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} - ) - - def after_transition(self, event: str, source: State, target: State): - # print(f"after '{event}' from '{source.id}' to '{target.id}'") - self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes - - -class ArchiveResultWorker(ActorType[ArchiveResult]): - """ - The primary actor for progressing ArchiveResult objects - through their lifecycle using the ArchiveResultMachine. - """ - Model = ArchiveResult - StateMachineClass = ArchiveResultMachine - - ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' - - MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 - MAX_TICK_TIME: ClassVar[int] = 60 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10