mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2025-05-19 09:25:11 -04:00
fix statemachine progression for Snapshot, Crawl, and ArchiveResult
This commit is contained in:
parent
684a394cba
commit
227fd4e1c6
3 changed files with 50 additions and 21 deletions
|
@ -19,7 +19,7 @@ class SnapshotActor(ActorType[Snapshot]):
|
||||||
|
|
||||||
ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
|
ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
|
||||||
FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed']
|
FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed']
|
||||||
STATE_FIELD_NAME: ClassVar[str] = SnapshotMachine.state_field_name # status
|
STATE_FIELD_NAME: ClassVar[str] = Snapshot.state_field_name # status
|
||||||
|
|
||||||
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
|
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
|
||||||
MAX_TICK_TIME: ClassVar[int] = 10
|
MAX_TICK_TIME: ClassVar[int] = 10
|
||||||
|
@ -37,7 +37,7 @@ class ArchiveResultActor(ActorType[ArchiveResult]):
|
||||||
|
|
||||||
ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
|
ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
|
||||||
FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped']
|
FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped']
|
||||||
STATE_FIELD_NAME: ClassVar[str] = ArchiveResultMachine.state_field_name # status
|
STATE_FIELD_NAME: ClassVar[str] = ArchiveResult.state_field_name # status
|
||||||
|
|
||||||
MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
|
MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
|
||||||
MAX_TICK_TIME: ClassVar[int] = 60
|
MAX_TICK_TIME: ClassVar[int] = 60
|
||||||
|
|
|
@ -26,9 +26,9 @@ class SnapshotMachine(StateMachine, strict_states=True):
|
||||||
|
|
||||||
# Tick Event
|
# Tick Event
|
||||||
tick = (
|
tick = (
|
||||||
queued.to.itself(unless='can_start', internal=True) |
|
queued.to.itself(unless='can_start') |
|
||||||
queued.to(started, cond='can_start') |
|
queued.to(started, cond='can_start') |
|
||||||
started.to.itself(unless='is_finished', internal=True) |
|
started.to.itself(unless='is_finished') |
|
||||||
started.to(sealed, cond='is_finished')
|
started.to(sealed, cond='is_finished')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,17 +37,25 @@ class SnapshotMachine(StateMachine, strict_states=True):
|
||||||
super().__init__(snapshot, *args, **kwargs)
|
super().__init__(snapshot, *args, **kwargs)
|
||||||
|
|
||||||
def can_start(self) -> bool:
|
def can_start(self) -> bool:
|
||||||
return self.snapshot.seed and self.snapshot.seed.uri
|
return self.snapshot.url
|
||||||
|
|
||||||
def is_finished(self) -> bool:
|
def is_finished(self) -> bool:
|
||||||
return not self.snapshot.has_pending_archiveresults()
|
if not self.snapshot.archiveresult_set.exists():
|
||||||
|
return False
|
||||||
|
if self.snapshot.pending_archiveresults().exists():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@started.enter
|
||||||
def on_started(self):
|
def on_started(self):
|
||||||
|
print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)')
|
||||||
self.snapshot.create_pending_archiveresults()
|
self.snapshot.create_pending_archiveresults()
|
||||||
self.snapshot.bump_retry_at(seconds=60)
|
self.snapshot.bump_retry_at(seconds=60)
|
||||||
self.snapshot.save()
|
self.snapshot.save()
|
||||||
|
|
||||||
|
@sealed.enter
|
||||||
def on_sealed(self):
|
def on_sealed(self):
|
||||||
|
print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None')
|
||||||
self.snapshot.retry_at = None
|
self.snapshot.retry_at = None
|
||||||
self.snapshot.save()
|
self.snapshot.save()
|
||||||
|
|
||||||
|
@ -70,13 +78,13 @@ class ArchiveResultMachine(StateMachine, strict_states=True):
|
||||||
|
|
||||||
# Tick Event
|
# Tick Event
|
||||||
tick = (
|
tick = (
|
||||||
queued.to.itself(unless='can_start', internal=True) |
|
queued.to.itself(unless='can_start') |
|
||||||
queued.to(started, cond='can_start') |
|
queued.to(started, cond='can_start') |
|
||||||
started.to.itself(unless='is_finished', internal=True) |
|
started.to.itself(unless='is_finished') |
|
||||||
started.to(succeeded, cond='is_succeeded') |
|
started.to(succeeded, cond='is_succeeded') |
|
||||||
started.to(failed, cond='is_failed') |
|
started.to(failed, cond='is_failed') |
|
||||||
started.to(backoff, cond='is_backoff') |
|
started.to(backoff, cond='is_backoff') |
|
||||||
backoff.to.itself(unless='can_start', internal=True) |
|
backoff.to.itself(unless='can_start') |
|
||||||
backoff.to(started, cond='can_start') |
|
backoff.to(started, cond='can_start') |
|
||||||
backoff.to(succeeded, cond='is_succeeded') |
|
backoff.to(succeeded, cond='is_succeeded') |
|
||||||
backoff.to(failed, cond='is_failed')
|
backoff.to(failed, cond='is_failed')
|
||||||
|
@ -101,27 +109,35 @@ class ArchiveResultMachine(StateMachine, strict_states=True):
|
||||||
def is_finished(self) -> bool:
|
def is_finished(self) -> bool:
|
||||||
return self.is_failed() or self.is_succeeded()
|
return self.is_failed() or self.is_succeeded()
|
||||||
|
|
||||||
|
@started.enter
|
||||||
def on_started(self):
|
def on_started(self):
|
||||||
|
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
|
||||||
self.archiveresult.start_ts = timezone.now()
|
self.archiveresult.start_ts = timezone.now()
|
||||||
self.archiveresult.create_output_dir()
|
self.archiveresult.create_output_dir()
|
||||||
self.archiveresult.bump_retry_at(seconds=60)
|
self.archiveresult.bump_retry_at(seconds=60)
|
||||||
self.archiveresult.save()
|
self.archiveresult.save()
|
||||||
|
|
||||||
|
@backoff.enter
|
||||||
def on_backoff(self):
|
def on_backoff(self):
|
||||||
|
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.bump_retry_at(+60s)')
|
||||||
self.archiveresult.bump_retry_at(seconds=60)
|
self.archiveresult.bump_retry_at(seconds=60)
|
||||||
self.archiveresult.save()
|
self.archiveresult.save()
|
||||||
|
|
||||||
|
@succeeded.enter
|
||||||
def on_succeeded(self):
|
def on_succeeded(self):
|
||||||
|
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.end_ts')
|
||||||
self.archiveresult.end_ts = timezone.now()
|
self.archiveresult.end_ts = timezone.now()
|
||||||
self.archiveresult.save()
|
self.archiveresult.save()
|
||||||
|
|
||||||
|
@failed.enter
|
||||||
def on_failed(self):
|
def on_failed(self):
|
||||||
|
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archiveresult.end_ts')
|
||||||
self.archiveresult.end_ts = timezone.now()
|
self.archiveresult.end_ts = timezone.now()
|
||||||
self.archiveresult.save()
|
self.archiveresult.save()
|
||||||
|
|
||||||
def after_transition(self, event: str, source: State, target: State):
|
# def after_transition(self, event: str, source: State, target: State):
|
||||||
print(f"after '{event}' from '{source.id}' to '{target.id}'")
|
# print(f"after '{event}' from '{source.id}' to '{target.id}'")
|
||||||
# self.archiveresult.save_merkle_index()
|
# # self.archiveresult.save_merkle_index()
|
||||||
# self.archiveresult.save_html_index()
|
# # self.archiveresult.save_html_index()
|
||||||
# self.archiveresult.save_json_index()
|
# # self.archiveresult.save_json_index()
|
||||||
return "after_transition"
|
# return "after_transition"
|
||||||
|
|
|
@ -20,9 +20,9 @@ class CrawlMachine(StateMachine, strict_states=True):
|
||||||
|
|
||||||
# Tick Event
|
# Tick Event
|
||||||
tick = (
|
tick = (
|
||||||
queued.to.itself(unless='can_start', internal=True) |
|
queued.to.itself(unless='can_start') |
|
||||||
queued.to(started, cond='can_start') |
|
queued.to(started, cond='can_start') |
|
||||||
started.to.itself(unless='is_finished', internal=True) |
|
started.to.itself(unless='is_finished') |
|
||||||
started.to(sealed, cond='is_finished')
|
started.to(sealed, cond='is_finished')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,15 +34,28 @@ class CrawlMachine(StateMachine, strict_states=True):
|
||||||
return self.crawl.seed and self.crawl.seed.uri
|
return self.crawl.seed and self.crawl.seed.uri
|
||||||
|
|
||||||
def is_finished(self) -> bool:
|
def is_finished(self) -> bool:
|
||||||
return not self.crawl.has_pending_archiveresults()
|
if not self.crawl.snapshot_set.exists():
|
||||||
|
return False
|
||||||
|
if self.crawl.pending_snapshots().exists():
|
||||||
|
return False
|
||||||
|
if self.crawl.pending_archiveresults().exists():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# def before_transition(self, event, state):
|
||||||
|
# print(f"Before '{event}', on the '{state.id}' state.")
|
||||||
|
# return "before_transition_return"
|
||||||
|
|
||||||
|
@started.enter
|
||||||
def on_started(self):
|
def on_started(self):
|
||||||
|
print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)')
|
||||||
self.crawl.create_root_snapshot()
|
self.crawl.create_root_snapshot()
|
||||||
self.crawl.bump_retry_at(seconds=10)
|
self.crawl.bump_retry_at(seconds=10)
|
||||||
self.crawl.save()
|
self.crawl.save()
|
||||||
|
|
||||||
|
@sealed.enter
|
||||||
def on_sealed(self):
|
def on_sealed(self):
|
||||||
|
print(f'CrawlMachine[{self.crawl.ABID}].on_sealed(): crawl.retry_at=None')
|
||||||
self.crawl.retry_at = None
|
self.crawl.retry_at = None
|
||||||
self.crawl.save()
|
self.crawl.save()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue