diff --git a/archivebox/core/actors.py b/archivebox/core/actors.py index d578c316..1e9db058 100644 --- a/archivebox/core/actors.py +++ b/archivebox/core/actors.py @@ -21,7 +21,7 @@ class SnapshotActor(ActorType[Snapshot]): FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed'] STATE_FIELD_NAME: ClassVar[str] = Snapshot.state_field_name # status - MAX_CONCURRENT_ACTORS: ClassVar[int] = 1 # 3 + MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 MAX_TICK_TIME: ClassVar[int] = 10 CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 @@ -39,7 +39,7 @@ class ArchiveResultActor(ActorType[ArchiveResult]): FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped'] STATE_FIELD_NAME: ClassVar[str] = ArchiveResult.state_field_name # status - MAX_CONCURRENT_ACTORS: ClassVar[int] = 1 # 6 + MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 MAX_TICK_TIME: ClassVar[int] = 60 CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 05d8af46..323784a5 100644 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -448,15 +448,16 @@ class Snapshot(ABIDModel, ModelWithStateMachine): for extractor in EXTRACTORS: if not extractor: continue - archiveresult = ArchiveResult.objects.update_or_create( + archiveresult, created = ArchiveResult.objects.get_or_create( snapshot=self, extractor=extractor, - status=ArchiveResult.INITIAL_STATE, defaults={ + 'status': ArchiveResult.INITIAL_STATE, 'retry_at': timezone.now(), }, ) - archiveresults.append(archiveresult) + if archiveresult.status == ArchiveResult.INITIAL_STATE: + archiveresults.append(archiveresult) return archiveresults @@ -625,19 +626,12 @@ class ArchiveResult(ABIDModel, ModelWithStateMachine): return '/api/v1/docs#/Core%20Models/api_v1_core_get_archiveresult' def get_absolute_url(self): - return f'/{self.snapshot.archive_path}/{self.output_path()}' + return f'/{self.snapshot.archive_path}/{self.extractor}' @property def extractor_module(self) -> Any | None: return abx.as_dict(abx.pm.hook.get_EXTRACTORS()).get(self.extractor, None) - def output_path(self) -> str | None: - """return the canonical output filename or directory name within the snapshot dir""" - try: - return self.extractor_module.get_output_path(self.snapshot) - except Exception as e: - print(f'Error getting output path for {self.extractor} extractor: {e}') - return None def embed_path(self) -> str | None: """ @@ -656,18 +650,13 @@ class ArchiveResult(ABIDModel, ModelWithStateMachine): return link.canonical_outputs().get(f'{self.extractor}_path') def output_exists(self) -> bool: - output_path = self.output_path() - return bool(output_path and os.path.exists(output_path)) + output_path = Path(self.snapshot_dir) / self.extractor + return os.path.exists(output_path) def create_output_dir(self): - snap_dir = Path(self.snapshot_dir) - snap_dir.mkdir(parents=True, exist_ok=True) - output_path = self.output_path() - if output_path: - (snap_dir / output_path).mkdir(parents=True, exist_ok=True) - else: - raise ValueError(f'Not able to calculate output path for {self.extractor} extractor in {snap_dir}') - return snap_dir / output_path + output_dir = Path(self.snapshot_dir) / self.extractor + output_dir.mkdir(parents=True, exist_ok=True) + return output_dir def as_json(self, *args) -> dict: args = args or self.keys diff --git a/archivebox/core/settings_logging.py b/archivebox/core/settings_logging.py index d292e15a..f9d29127 100644 --- a/archivebox/core/settings_logging.py +++ b/archivebox/core/settings_logging.py @@ -65,10 +65,14 @@ else: # print(f'[!] WARNING: data/logs dir does not exist. Logging to temp file: {ERROR_LOG}') pass - -LOG_LEVEL_DATABASE = 'WARNING' # if DEBUG else 'WARNING' +LOG_LEVEL_DATABASE = 'WARNING' LOG_LEVEL_REQUEST = 'WARNING' # if DEBUG else 'WARNING' +# UNCOMMENT TO LOG ALL SQL QUERIES: +# LOG_LEVEL_DATABASE = 'DEBUG' +# db_logger = logging.getLogger('django.db.backends') +# db_logger.setLevel(logging.DEBUG) +# db_logger.addHandler(logging.StreamHandler()) SETTINGS_LOGGING = { diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index cc96a8ad..0f1fb31c 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -1,5 +1,7 @@ __package__ = 'archivebox.snapshots' +import time + from django.utils import timezone from statemachine import State, StateMachine @@ -67,7 +69,7 @@ class SnapshotMachine(StateMachine, strict_states=True): def enter_started(self): print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') self.snapshot.status = Snapshot.StatusChoices.STARTED - self.snapshot.bump_retry_at(seconds=60) + self.snapshot.bump_retry_at(seconds=2) self.snapshot.save() self.snapshot.create_pending_archiveresults() @@ -117,13 +119,19 @@ class ArchiveResultMachine(StateMachine, strict_states=True): return self.archiveresult.snapshot and (self.archiveresult.retry_at < timezone.now()) def is_succeeded(self) -> bool: - return self.archiveresult.output_exists() + if self.archiveresult.output and 'err' not in self.archiveresult.output.lower(): + return True + return False def is_failed(self) -> bool: - return not self.archiveresult.output_exists() + if self.archiveresult.output and 'err' in self.archiveresult.output.lower(): + return True + return False def is_backoff(self) -> bool: - return self.archiveresult.STATE == ArchiveResult.StatusChoices.BACKOFF + if self.archiveresult.output is None: + return True + return False def is_finished(self) -> bool: return self.is_failed() or self.is_succeeded() @@ -141,19 +149,22 @@ class ArchiveResultMachine(StateMachine, strict_states=True): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') self.archiveresult.status = ArchiveResult.StatusChoices.STARTED self.archiveresult.start_ts = timezone.now() - self.archiveresult.bump_retry_at(seconds=60) + self.archiveresult.bump_retry_at(seconds=2) self.archiveresult.save() self.archiveresult.create_output_dir() + time.sleep(2) + self.archiveresult.output = 'completed' + self.archiveresult.save() @backoff.enter def enter_backoff(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') self.archiveresult.status = ArchiveResult.StatusChoices.BACKOFF self.archiveresult.retries = getattr(self.archiveresult, 'retries', 0) + 1 - self.archiveresult.bump_retry_at(seconds=60) + self.archiveresult.bump_retry_at(seconds=2) self.archiveresult.end_ts = None self.archiveresult.save() - + @succeeded.enter def enter_succeeded(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()') diff --git a/archivebox/crawls/actors.py b/archivebox/crawls/actors.py index 2426196e..55c9f92c 100644 --- a/archivebox/crawls/actors.py +++ b/archivebox/crawls/actors.py @@ -18,6 +18,6 @@ class CrawlActor(ActorType[Crawl]): FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name - MAX_CONCURRENT_ACTORS: ClassVar[int] = 1 + MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 MAX_TICK_TIME: ClassVar[int] = 10 CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 01f7e38f..890e02a5 100644 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -190,22 +190,28 @@ class Crawl(ABIDModel, ModelWithHealthStats, ModelWithStateMachine): from core.models import ArchiveResult snapshot_ids = self.snapshot_set.values_list('id', flat=True) - pending_archiveresults = ArchiveResult.objects.filter(snapshot_id__in=snapshot_ids, retry_at__isnull=True) + pending_archiveresults = ArchiveResult.objects.filter(snapshot_id__in=snapshot_ids, retry_at__isnull=False) return pending_archiveresults def create_root_snapshot(self) -> 'Snapshot': from core.models import Snapshot + try: + return Snapshot.objects.get(crawl=self, url=self.seed.uri) + except Snapshot.DoesNotExist: + pass + root_snapshot, _ = Snapshot.objects.update_or_create( + crawl=self, url=self.seed.uri, defaults={ - 'crawl': self, 'status': Snapshot.INITIAL_STATE, 'retry_at': timezone.now(), 'timestamp': str(timezone.now().timestamp()), # 'config': self.seed.config, }, ) + root_snapshot.save() return root_snapshot diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index d3781933..9fe009fd 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -52,7 +52,7 @@ class CrawlMachine(StateMachine, strict_states=True): def enter_started(self): print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)') self.crawl.status = Crawl.StatusChoices.STARTED - self.crawl.bump_retry_at(seconds=10) + self.crawl.bump_retry_at(seconds=2) self.crawl.save() self.crawl.create_root_snapshot() diff --git a/archivebox/pkgs/abx-plugin-git/abx_plugin_git/extractors.py b/archivebox/pkgs/abx-plugin-git/abx_plugin_git/extractors.py index cc1e9a90..9c815c32 100644 --- a/archivebox/pkgs/abx-plugin-git/abx_plugin_git/extractors.py +++ b/archivebox/pkgs/abx-plugin-git/abx_plugin_git/extractors.py @@ -1,7 +1,5 @@ __package__ = 'abx_plugin_git' -from pathlib import Path - from abx_pkg import BinName @@ -14,7 +12,7 @@ class GitExtractor(BaseExtractor): name: ExtractorName = 'git' binary: BinName = GIT_BINARY.name - def get_output_path(self, snapshot) -> Path | None: - return snapshot.as_link() / 'git' + def get_output_path(self, snapshot) -> str: + return 'git' GIT_EXTRACTOR = GitExtractor()