From 5cf7725f0e3cdde8b2e7e322e5fd344afcd3fe55 Mon Sep 17 00:00:00 2001
From: Nick Sweeting <git@sweeting.me>
Date: Thu, 12 Dec 2024 21:25:46 -0800
Subject: [PATCH] add new archivebox worker implementation based on better
 distributed systems principles

---
 archivebox/cli/archivebox_worker.py |  33 ++
 archivebox/workers/worker.py        | 448 ++++++++++++++++++++++++++++
 2 files changed, 481 insertions(+)
 create mode 100644 archivebox/cli/archivebox_worker.py
 create mode 100644 archivebox/workers/worker.py

diff --git a/archivebox/cli/archivebox_worker.py b/archivebox/cli/archivebox_worker.py
new file mode 100644
index 00000000..3baba637
--- /dev/null
+++ b/archivebox/cli/archivebox_worker.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+
+__package__ = 'archivebox.cli'
+__command__ = 'archivebox worker'
+
+import sys
+import json
+
+import rich_click as click
+
+
+@click.command()
+@click.argument('worker_type')
+@click.option('--wait-for-first-event', is_flag=True)
+@click.option('--exit-on-idle', is_flag=True)
+def main(worker_type: str, wait_for_first_event: bool, exit_on_idle: bool):
+    """Start an ArchiveBox worker process of the given type"""
+    
+    from workers.worker import get_worker_type
+    
+    # allow piping in events to process from stdin
+    # if not sys.stdin.isatty():
+    #     for line in sys.stdin.readlines():
+    #         Event.dispatch(event=json.loads(line), parent=None)
+
+    # run the actor
+    Worker = get_worker_type(worker_type)
+    for event in Worker.run(wait_for_first_event=wait_for_first_event, exit_on_idle=exit_on_idle):
+        print(event)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py
new file mode 100644
index 00000000..fb56b56e
--- /dev/null
+++ b/archivebox/workers/worker.py
@@ -0,0 +1,448 @@
+__package__ = 'archivebox.workers'
+
+import os
+import sys
+import time
+import uuid
+import json
+import unittest
+from typing import ClassVar, Iterable, Type
+from pathlib import Path
+
+from rich import print
+
+from django.db import transaction
+from django.db.models import QuerySet
+from django.utils import timezone
+from django.utils.functional import classproperty       # type: ignore
+
+from crawls.models import Seed, Crawl
+from core.models import Snapshot, ArchiveResult
+
+from workers.models import Event, Process, EventDict
+
+
+class WorkerType:
+    # static class attributes
+    name: ClassVar[str]              # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
+    
+    listens_to: ClassVar[str]        # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
+    outputs: ClassVar[list[str]]     # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
+    
+    poll_interval: ClassVar[int] = 1     # how long to wait before polling for new events
+    
+    @classproperty
+    def event_queue(cls) -> QuerySet[Event]:
+        return Event.objects.filter(name__startswith=cls.listens_to)
+
+    @classmethod
+    def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
+        cmd = ['archivebox', 'worker', cls.name]
+        if exit_on_idle:
+            cmd.append('--exit-on-idle')
+        if wait_for_first_event:
+            cmd.append('--wait-for-first-event')
+        return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
+
+    @classproperty
+    def processes(cls) -> QuerySet[Process]:
+        return Process.objects.filter(actor_type=cls.name)
+
+    @classmethod
+    def run(cls, wait_for_first_event=False, exit_on_idle=True):
+
+        if wait_for_first_event:
+            event = cls.event_queue.get_next_unclaimed()
+            while not event:
+                time.sleep(cls.poll_interval)
+                event = cls.event_queue.get_next_unclaimed()
+
+        while True:
+            output_events = list(cls.process_next_event()) or list(cls.process_idle_tick())   # process next event, or tick if idle
+            yield from output_events
+            if not output_events:
+                if exit_on_idle:
+                    break
+                else:
+                    time.sleep(cls.poll_interval)
+
+    @classmethod
+    def process_next_event(cls) -> Iterable[EventDict]:
+        event = cls.event_queue.get_next_unclaimed()
+        output_events = []
+        
+        if not event:
+            return []
+        
+        cls.mark_event_claimed(event)
+        print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
+        try:
+            for output_event in cls.receive(event):
+                output_events.append(output_event)
+                yield output_event
+            cls.mark_event_succeeded(event, output_events=output_events)
+        except BaseException as e:
+            cls.mark_event_failed(event, output_events=output_events, error=e)
+
+    @classmethod
+    def process_idle_tick(cls) -> Iterable[EventDict]:
+        # reset the idle event to be claimed by the current process
+        event, _created = Event.objects.update_or_create(
+            name=f'{cls.listens_to}IDLE',
+            emitted_by=Process.current(),
+            defaults={
+                'deliver_at': timezone.now(),
+                'claimed_proc': None,
+                'claimed_at': None,
+                'finished_at': None,
+                'error': None,
+                'parent': None,
+            },
+        )
+        
+        # then process it like any other event
+        yield from cls.process_next_event()
+
+    @classmethod
+    def receive(cls, event: Event) -> Iterable[EventDict]:
+        handler_method = getattr(cls, f'on_{event.name}', None)
+        if handler_method:
+            yield from handler_method(event)
+        else:
+            raise Exception(f'No handler method for event: {event.name}')
+
+    @staticmethod
+    def on_IDLE() -> Iterable[EventDict]:
+        return []
+    
+    @staticmethod
+    def mark_event_claimed(event: Event):
+        proc = Process.current()
+        
+        with transaction.atomic():
+            claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
+            event.refresh_from_db()
+            if not claimed:
+                raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
+            
+            print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
+            
+            # process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
+            # if not process_updated:
+            #     raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
+
+    @staticmethod
+    def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
+        event.refresh_from_db()
+        assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
+        assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
+        
+        with transaction.atomic():
+            updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
+            event.refresh_from_db()
+            if not updated:
+                raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
+
+            # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
+            # if not process_updated:
+            #     raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
+
+        # dispatch any output events
+        for output_event in output_events:
+            Event.dispatch(event=output_event, parent=event)
+
+        # trigger any callback events
+        if event.on_success:
+            Event.dispatch(event=event.on_success, parent=event)
+
+    @staticmethod
+    def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
+        event.refresh_from_db()
+        assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
+        assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
+        
+        with transaction.atomic():
+            updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
+            event.refresh_from_db()
+            if not updated:
+                raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
+
+            # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
+            # if not process_updated:
+            #     raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
+
+        
+        # add dedicated error event to the output events
+        if not event.name.endswith('_ERROR'):
+            output_events = [
+                *output_events,
+                {'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
+            ]
+            
+        # dispatch any output events
+        for output_event in output_events:
+            Event.dispatch(event=output_event, parent=event)
+        
+        # trigger any callback events
+        if event.on_failure:
+            Event.dispatch(event=event.on_failure, parent=event)
+
+
+
+
+class OrchestratorWorker(WorkerType):
+    name = 'orchestrator'
+    listens_to = 'PROC_'
+    outputs = ['PROC_']
+    
+    @staticmethod
+    def on_PROC_IDLE() -> Iterable[EventDict]:
+        # look through all Processes that are not yet launched and launch them
+        to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
+        if not to_launch:
+            return []
+        
+        yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
+    
+    @staticmethod
+    def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
+        process = Process.create_and_fork(**event.kwargs)
+        yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
+        
+    @staticmethod
+    def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
+        process = Process.objects.get(id=event.process_id)
+        process.kill()
+        yield {'name': 'PROC_KILLED', 'process_id': process.id}
+        
+    @staticmethod
+    def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
+        process = Process.objects.get(id=event.process_id)
+        process.kill()
+        yield {'name': 'PROC_KILLED', 'process_id': process.id}
+
+
+class FileSystemWorker(WorkerType):
+    name = 'filesystem'
+    listens_to = 'FS_'
+    outputs = ['FS_']
+
+    @staticmethod
+    def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
+        # check for tmp files that can be deleted
+        for tmp_file in Path('/tmp').glob('archivebox/*'):
+            yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
+            
+    @staticmethod
+    def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
+        with open(event.path, 'w') as f:
+            f.write(event.content)
+        yield {'name': 'FS_CHANGED', 'path': event.path}
+
+    @staticmethod
+    def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
+        with open(event.path, 'a') as f:
+            f.write(event.content)
+        yield {'name': 'FS_CHANGED', 'path': event.path}
+        
+    @staticmethod
+    def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
+        os.remove(event.path)
+        yield {'name': 'FS_CHANGED', 'path': event.path}
+        
+    @staticmethod
+    def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
+        os.system(f'rsync -av {event.src} {event.dst}')
+        yield {'name': 'FS_CHANGED', 'path': event.dst}
+
+
+class CrawlWorker(WorkerType):
+    name = 'crawl'
+    listens_to = 'CRAWL_'
+    outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
+
+    @staticmethod
+    def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
+        # check for any stale crawls that can be started or sealed
+        stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
+        if not stale_crawl:
+            return []
+
+        if stale_crawl.can_start():
+            yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
+        
+        elif stale_crawl.can_seal():
+            yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
+            
+    @staticmethod
+    def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
+        crawl = Crawl.objects.create(id=event.id, **event)
+        yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
+        
+    @staticmethod
+    def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
+        Crawl.objects.filter(id=event.id).update(**event)
+        yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
+        
+    @staticmethod
+    def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
+        crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
+        if not crawl:
+            return
+        crawl.status = Crawl.StatusChoices.SEALED
+        crawl.save()
+        yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
+        
+    @staticmethod
+    def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
+        # create root snapshot
+        crawl = Crawl.objects.get(id=event.crawl_id)
+        new_snapshot_id = uuid.uuid4()
+        yield {'name': 'SNAPSHOT_CREATE', 'id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
+        yield {'name': 'SNAPSHOT_START', 'id': new_snapshot_id}
+        yield {'name': 'CRAWL_UPDATE', 'id': crawl.id, 'status': 'started', 'retry_at': None}
+
+
+class SnapshotWorker(WorkerType):
+    name = 'snapshot'
+    listens_to = 'SNAPSHOT_'
+    outputs = ['SNAPSHOT_', 'FS_']
+    
+    @staticmethod
+    def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
+        # check for any snapshots that can be started or sealed
+        snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
+        if not snapshot:
+            return []
+        
+        if snapshot.can_start():
+            yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
+        elif snapshot.can_seal():
+            yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
+            
+    @staticmethod
+    def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
+        snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
+        yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
+    
+    @staticmethod
+    def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
+        snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
+        assert snapshot.can_seal()
+        snapshot.status = Snapshot.StatusChoices.SEALED
+        snapshot.save()
+        yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
+
+    @staticmethod
+    def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
+        snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
+        assert snapshot.can_start()
+        
+        # create pending archiveresults for each extractor
+        for extractor in snapshot.get_extractors():
+            new_archiveresult_id = uuid.uuid4()
+            yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
+            yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
+            
+        snapshot.status = Snapshot.StatusChoices.STARTED
+        snapshot.save()
+        yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
+        
+        
+
+class ArchiveResultWorker(WorkerType):
+    name = 'archiveresult'
+    listens_to = 'ARCHIVERESULT_'
+    outputs = ['ARCHIVERESULT_', 'FS_']
+
+
+    @staticmethod
+    def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
+        ArchiveResult.objects.filter(id=event.id).update(**event.kwargs)
+        archiveresult = ArchiveResult.objects.get(id=event.id)
+        yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / f'{archiveresult.ABID}.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
+        
+    @staticmethod
+    def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
+        archiveresult = ArchiveResult.objects.create(id=event.id, **event)
+        yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
+
+    @staticmethod
+    def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
+        archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
+        
+        yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed', 'on_success': {
+            'name': 'FS_RSYNC', 'src': archiveresult.OUTPUT_DIR, 'dst': archiveresult.snapshot.OUTPUT_DIR, 'await_event_id': update_id,
+        }}
+
+    @staticmethod
+    def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
+        archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
+
+        yield {
+            'name': 'SHELL_EXEC',
+            'cmd': archiveresult.EXTRACTOR.get_cmd(),
+            'cwd': archiveresult.OUTPUT_DIR,
+            'on_exit': {
+                'name': 'ARCHIVERESULT_SEAL',
+                'id': archiveresult.id,
+            },
+        }
+        
+        archiveresult.status = ArchiveResult.StatusChoices.STARTED
+        archiveresult.save()
+        yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
+        yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
+        
+    @staticmethod
+    def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
+        stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
+        if not stale_archiveresult:
+            return []
+        if stale_archiveresult.can_start():
+            yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
+        if stale_archiveresult.can_seal():
+            yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
+
+
+WORKER_TYPES = [
+    OrchestratorWorker,
+    FileSystemWorker,
+    CrawlWorker,
+    SnapshotWorker,
+    ArchiveResultWorker,
+]
+
+def get_worker_type(name: str) -> Type[WorkerType]:
+    for worker_type in WORKER_TYPES:
+        if worker_type.name == name:
+            return worker_type
+    raise Exception(f'Worker type not found: {name}')
+
+# class CrawlActorTest(unittest.TestCase):
+
+#     def test_crawl_creation(self):
+#         seed = Seed.objects.create(uri='https://example.com')
+#         Event.dispatch('CRAWL_CREATE', {'seed_id': seed.id})
+        
+#         crawl_actor = CrawlActor()
+        
+#         output_events = list(crawl_actor.process_next_event())
+        
+#         assert len(output_events) == 1
+#         assert output_events[0].get('name', 'unset') == 'FS_WRITE'
+#         assert output_events[0].get('path') == '/tmp/test_crawl/index.json'
+
+#         output_events = list(crawl_actor.process_next_event())
+#         assert len(output_events) == 1
+#         assert output_events[0].get('name', 'unset') == 'CRAWL_CREATED'
+        
+#         assert Crawl.objects.filter(seed_id=seed.id).exists(), 'Crawl was not created'
+