diff --git a/archivebox/core/actors.py b/archivebox/core/actors.py index 18281336..1e9db058 100644 --- a/archivebox/core/actors.py +++ b/archivebox/core/actors.py @@ -19,7 +19,7 @@ class SnapshotActor(ActorType[Snapshot]): ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started' FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed'] - STATE_FIELD_NAME: ClassVar[str] = SnapshotMachine.state_field_name # status + STATE_FIELD_NAME: ClassVar[str] = Snapshot.state_field_name # status MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 MAX_TICK_TIME: ClassVar[int] = 10 @@ -37,7 +37,7 @@ class ArchiveResultActor(ActorType[ArchiveResult]): ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped'] - STATE_FIELD_NAME: ClassVar[str] = ArchiveResultMachine.state_field_name # status + STATE_FIELD_NAME: ClassVar[str] = ArchiveResult.state_field_name # status MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 MAX_TICK_TIME: ClassVar[int] = 60 diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index 88a48b82..85cad102 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -26,9 +26,9 @@ class SnapshotMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start', internal=True) | + queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished', internal=True) | + started.to.itself(unless='is_finished') | started.to(sealed, cond='is_finished') ) @@ -37,17 +37,25 @@ class SnapshotMachine(StateMachine, strict_states=True): super().__init__(snapshot, *args, **kwargs) def can_start(self) -> bool: - return self.snapshot.seed and self.snapshot.seed.uri + return self.snapshot.url def is_finished(self) -> bool: - return not self.snapshot.has_pending_archiveresults() + if not self.snapshot.archiveresult_set.exists(): + return False + if self.snapshot.pending_archiveresults().exists(): + return False + return True + @started.enter def on_started(self): + print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') self.snapshot.create_pending_archiveresults() self.snapshot.bump_retry_at(seconds=60) self.snapshot.save() + @sealed.enter def on_sealed(self): + print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None') self.snapshot.retry_at = None self.snapshot.save() @@ -70,13 +78,13 @@ class ArchiveResultMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start', internal=True) | + queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished', internal=True) | + started.to.itself(unless='is_finished') | started.to(succeeded, cond='is_succeeded') | started.to(failed, cond='is_failed') | started.to(backoff, cond='is_backoff') | - backoff.to.itself(unless='can_start', internal=True) | + backoff.to.itself(unless='can_start') | backoff.to(started, cond='can_start') | backoff.to(succeeded, cond='is_succeeded') | backoff.to(failed, cond='is_failed') @@ -101,27 +109,35 @@ class ArchiveResultMachine(StateMachine, strict_states=True): def is_finished(self) -> bool: return self.is_failed() or self.is_succeeded() + @started.enter def on_started(self): + print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') self.archiveresult.start_ts = timezone.now() self.archiveresult.create_output_dir() self.archiveresult.bump_retry_at(seconds=60) self.archiveresult.save() + @backoff.enter def on_backoff(self): + print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.bump_retry_at(+60s)') self.archiveresult.bump_retry_at(seconds=60) self.archiveresult.save() + @succeeded.enter def on_succeeded(self): + print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.end_ts') self.archiveresult.end_ts = timezone.now() self.archiveresult.save() + @failed.enter def on_failed(self): + print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archiveresult.end_ts') self.archiveresult.end_ts = timezone.now() self.archiveresult.save() - def after_transition(self, event: str, source: State, target: State): - print(f"after '{event}' from '{source.id}' to '{target.id}'") - # self.archiveresult.save_merkle_index() - # self.archiveresult.save_html_index() - # self.archiveresult.save_json_index() - return "after_transition" + # def after_transition(self, event: str, source: State, target: State): + # print(f"after '{event}' from '{source.id}' to '{target.id}'") + # # self.archiveresult.save_merkle_index() + # # self.archiveresult.save_html_index() + # # self.archiveresult.save_json_index() + # return "after_transition" diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index f5db8375..12ba0e03 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -20,9 +20,9 @@ class CrawlMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start', internal=True) | + queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished', internal=True) | + started.to.itself(unless='is_finished') | started.to(sealed, cond='is_finished') ) @@ -34,15 +34,28 @@ class CrawlMachine(StateMachine, strict_states=True): return self.crawl.seed and self.crawl.seed.uri def is_finished(self) -> bool: - return not self.crawl.has_pending_archiveresults() - - + if not self.crawl.snapshot_set.exists(): + return False + if self.crawl.pending_snapshots().exists(): + return False + if self.crawl.pending_archiveresults().exists(): + return False + return True + # def before_transition(self, event, state): + # print(f"Before '{event}', on the '{state.id}' state.") + # return "before_transition_return" + + @started.enter def on_started(self): + print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)') self.crawl.create_root_snapshot() self.crawl.bump_retry_at(seconds=10) self.crawl.save() - + + @sealed.enter def on_sealed(self): + print(f'CrawlMachine[{self.crawl.ABID}].on_sealed(): crawl.retry_at=None') self.crawl.retry_at = None self.crawl.save() +