diff --git a/archivebox/cli/archivebox_worker.py b/archivebox/cli/archivebox_worker.py new file mode 100644 index 00000000..3baba637 --- /dev/null +++ b/archivebox/cli/archivebox_worker.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +__package__ = 'archivebox.cli' +__command__ = 'archivebox worker' + +import sys +import json + +import rich_click as click + + +@click.command() +@click.argument('worker_type') +@click.option('--wait-for-first-event', is_flag=True) +@click.option('--exit-on-idle', is_flag=True) +def main(worker_type: str, wait_for_first_event: bool, exit_on_idle: bool): + """Start an ArchiveBox worker process of the given type""" + + from workers.worker import get_worker_type + + # allow piping in events to process from stdin + # if not sys.stdin.isatty(): + # for line in sys.stdin.readlines(): + # Event.dispatch(event=json.loads(line), parent=None) + + # run the actor + Worker = get_worker_type(worker_type) + for event in Worker.run(wait_for_first_event=wait_for_first_event, exit_on_idle=exit_on_idle): + print(event) + + +if __name__ == '__main__': + main() diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py new file mode 100644 index 00000000..fb56b56e --- /dev/null +++ b/archivebox/workers/worker.py @@ -0,0 +1,448 @@ +__package__ = 'archivebox.workers' + +import os +import sys +import time +import uuid +import json +import unittest +from typing import ClassVar, Iterable, Type +from pathlib import Path + +from rich import print + +from django.db import transaction +from django.db.models import QuerySet +from django.utils import timezone +from django.utils.functional import classproperty # type: ignore + +from crawls.models import Seed, Crawl +from core.models import Snapshot, ArchiveResult + +from workers.models import Event, Process, EventDict + + +class WorkerType: + # static class attributes + name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc. + + listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc. + outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc. + + poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events + + @classproperty + def event_queue(cls) -> QuerySet[Event]: + return Event.objects.filter(name__startswith=cls.listens_to) + + @classmethod + def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process: + cmd = ['archivebox', 'worker', cls.name] + if exit_on_idle: + cmd.append('--exit-on-idle') + if wait_for_first_event: + cmd.append('--wait-for-first-event') + return Process.create_and_fork(cmd=cmd, actor_type=cls.name) + + @classproperty + def processes(cls) -> QuerySet[Process]: + return Process.objects.filter(actor_type=cls.name) + + @classmethod + def run(cls, wait_for_first_event=False, exit_on_idle=True): + + if wait_for_first_event: + event = cls.event_queue.get_next_unclaimed() + while not event: + time.sleep(cls.poll_interval) + event = cls.event_queue.get_next_unclaimed() + + while True: + output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle + yield from output_events + if not output_events: + if exit_on_idle: + break + else: + time.sleep(cls.poll_interval) + + @classmethod + def process_next_event(cls) -> Iterable[EventDict]: + event = cls.event_queue.get_next_unclaimed() + output_events = [] + + if not event: + return [] + + cls.mark_event_claimed(event) + print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr) + try: + for output_event in cls.receive(event): + output_events.append(output_event) + yield output_event + cls.mark_event_succeeded(event, output_events=output_events) + except BaseException as e: + cls.mark_event_failed(event, output_events=output_events, error=e) + + @classmethod + def process_idle_tick(cls) -> Iterable[EventDict]: + # reset the idle event to be claimed by the current process + event, _created = Event.objects.update_or_create( + name=f'{cls.listens_to}IDLE', + emitted_by=Process.current(), + defaults={ + 'deliver_at': timezone.now(), + 'claimed_proc': None, + 'claimed_at': None, + 'finished_at': None, + 'error': None, + 'parent': None, + }, + ) + + # then process it like any other event + yield from cls.process_next_event() + + @classmethod + def receive(cls, event: Event) -> Iterable[EventDict]: + handler_method = getattr(cls, f'on_{event.name}', None) + if handler_method: + yield from handler_method(event) + else: + raise Exception(f'No handler method for event: {event.name}') + + @staticmethod + def on_IDLE() -> Iterable[EventDict]: + return [] + + @staticmethod + def mark_event_claimed(event: Event): + proc = Process.current() + + with transaction.atomic(): + claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now()) + event.refresh_from_db() + if not claimed: + raise Exception(f'Event already claimed by another process: {event.claimed_proc}') + + print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️') + + # process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event) + # if not process_updated: + # raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}') + + @staticmethod + def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]): + event.refresh_from_db() + assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}' + assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}' + + with transaction.atomic(): + updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now()) + event.refresh_from_db() + if not updated: + raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}') + + # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None) + # if not process_updated: + # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}') + + # dispatch any output events + for output_event in output_events: + Event.dispatch(event=output_event, parent=event) + + # trigger any callback events + if event.on_success: + Event.dispatch(event=event.on_success, parent=event) + + @staticmethod + def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None): + event.refresh_from_db() + assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}' + assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}' + + with transaction.atomic(): + updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error)) + event.refresh_from_db() + if not updated: + raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}') + + # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None) + # if not process_updated: + # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}') + + + # add dedicated error event to the output events + if not event.name.endswith('_ERROR'): + output_events = [ + *output_events, + {'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'}, + ] + + # dispatch any output events + for output_event in output_events: + Event.dispatch(event=output_event, parent=event) + + # trigger any callback events + if event.on_failure: + Event.dispatch(event=event.on_failure, parent=event) + + + + +class OrchestratorWorker(WorkerType): + name = 'orchestrator' + listens_to = 'PROC_' + outputs = ['PROC_'] + + @staticmethod + def on_PROC_IDLE() -> Iterable[EventDict]: + # look through all Processes that are not yet launched and launch them + to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first() + if not to_launch: + return [] + + yield {'name': 'PROC_LAUNCH', 'id': to_launch.id} + + @staticmethod + def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]: + process = Process.create_and_fork(**event.kwargs) + yield {'name': 'PROC_LAUNCHED', 'process_id': process.id} + + @staticmethod + def on_PROC_EXIT(event: Event) -> Iterable[EventDict]: + process = Process.objects.get(id=event.process_id) + process.kill() + yield {'name': 'PROC_KILLED', 'process_id': process.id} + + @staticmethod + def on_PROC_KILL(event: Event) -> Iterable[EventDict]: + process = Process.objects.get(id=event.process_id) + process.kill() + yield {'name': 'PROC_KILLED', 'process_id': process.id} + + +class FileSystemWorker(WorkerType): + name = 'filesystem' + listens_to = 'FS_' + outputs = ['FS_'] + + @staticmethod + def on_FS_IDLE(event: Event) -> Iterable[EventDict]: + # check for tmp files that can be deleted + for tmp_file in Path('/tmp').glob('archivebox/*'): + yield {'name': 'FS_DELETE', 'path': str(tmp_file)} + + @staticmethod + def on_FS_WRITE(event: Event) -> Iterable[EventDict]: + with open(event.path, 'w') as f: + f.write(event.content) + yield {'name': 'FS_CHANGED', 'path': event.path} + + @staticmethod + def on_FS_APPEND(event: Event) -> Iterable[EventDict]: + with open(event.path, 'a') as f: + f.write(event.content) + yield {'name': 'FS_CHANGED', 'path': event.path} + + @staticmethod + def on_FS_DELETE(event: Event) -> Iterable[EventDict]: + os.remove(event.path) + yield {'name': 'FS_CHANGED', 'path': event.path} + + @staticmethod + def on_FS_RSYNC(event: Event) -> Iterable[EventDict]: + os.system(f'rsync -av {event.src} {event.dst}') + yield {'name': 'FS_CHANGED', 'path': event.dst} + + +class CrawlWorker(WorkerType): + name = 'crawl' + listens_to = 'CRAWL_' + outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_'] + + @staticmethod + def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]: + # check for any stale crawls that can be started or sealed + stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first() + if not stale_crawl: + return [] + + if stale_crawl.can_start(): + yield {'name': 'CRAWL_START', 'id': stale_crawl.id} + + elif stale_crawl.can_seal(): + yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id} + + @staticmethod + def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]: + crawl = Crawl.objects.create(id=event.id, **event) + yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'CRAWL_UPDATED', 'id': crawl.id} + + @staticmethod + def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]: + Crawl.objects.filter(id=event.id).update(**event) + yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'CRAWL_UPDATED', 'id': crawl.id} + + @staticmethod + def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]: + crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first() + if not crawl: + return + crawl.status = Crawl.StatusChoices.SEALED + crawl.save() + yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'CRAWL_UPDATED', 'id': crawl.id} + + @staticmethod + def on_CRAWL_START(event: Event) -> Iterable[EventDict]: + # create root snapshot + crawl = Crawl.objects.get(id=event.crawl_id) + new_snapshot_id = uuid.uuid4() + yield {'name': 'SNAPSHOT_CREATE', 'id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri} + yield {'name': 'SNAPSHOT_START', 'id': new_snapshot_id} + yield {'name': 'CRAWL_UPDATE', 'id': crawl.id, 'status': 'started', 'retry_at': None} + + +class SnapshotWorker(WorkerType): + name = 'snapshot' + listens_to = 'SNAPSHOT_' + outputs = ['SNAPSHOT_', 'FS_'] + + @staticmethod + def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]: + # check for any snapshots that can be started or sealed + snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first() + if not snapshot: + return [] + + if snapshot.can_start(): + yield {'name': 'SNAPSHOT_START', 'id': snapshot.id} + elif snapshot.can_seal(): + yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id} + + @staticmethod + def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]: + snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs) + yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id} + + @staticmethod + def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]: + snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED) + assert snapshot.can_seal() + snapshot.status = Snapshot.StatusChoices.SEALED + snapshot.save() + yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id} + + @staticmethod + def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]: + snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED) + assert snapshot.can_start() + + # create pending archiveresults for each extractor + for extractor in snapshot.get_extractors(): + new_archiveresult_id = uuid.uuid4() + yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name} + yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id} + + snapshot.status = Snapshot.StatusChoices.STARTED + snapshot.save() + yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id} + + + +class ArchiveResultWorker(WorkerType): + name = 'archiveresult' + listens_to = 'ARCHIVERESULT_' + outputs = ['ARCHIVERESULT_', 'FS_'] + + + @staticmethod + def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]: + ArchiveResult.objects.filter(id=event.id).update(**event.kwargs) + archiveresult = ArchiveResult.objects.get(id=event.id) + yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / f'{archiveresult.ABID}.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id} + + @staticmethod + def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]: + archiveresult = ArchiveResult.objects.create(id=event.id, **event) + yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id} + + @staticmethod + def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]: + archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED) + + yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed', 'on_success': { + 'name': 'FS_RSYNC', 'src': archiveresult.OUTPUT_DIR, 'dst': archiveresult.snapshot.OUTPUT_DIR, 'await_event_id': update_id, + }} + + @staticmethod + def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]: + archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED) + + yield { + 'name': 'SHELL_EXEC', + 'cmd': archiveresult.EXTRACTOR.get_cmd(), + 'cwd': archiveresult.OUTPUT_DIR, + 'on_exit': { + 'name': 'ARCHIVERESULT_SEAL', + 'id': archiveresult.id, + }, + } + + archiveresult.status = ArchiveResult.StatusChoices.STARTED + archiveresult.save() + yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)} + yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id} + + @staticmethod + def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]: + stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first() + if not stale_archiveresult: + return [] + if stale_archiveresult.can_start(): + yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id} + if stale_archiveresult.can_seal(): + yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id} + + +WORKER_TYPES = [ + OrchestratorWorker, + FileSystemWorker, + CrawlWorker, + SnapshotWorker, + ArchiveResultWorker, +] + +def get_worker_type(name: str) -> Type[WorkerType]: + for worker_type in WORKER_TYPES: + if worker_type.name == name: + return worker_type + raise Exception(f'Worker type not found: {name}') + +# class CrawlActorTest(unittest.TestCase): + +# def test_crawl_creation(self): +# seed = Seed.objects.create(uri='https://example.com') +# Event.dispatch('CRAWL_CREATE', {'seed_id': seed.id}) + +# crawl_actor = CrawlActor() + +# output_events = list(crawl_actor.process_next_event()) + +# assert len(output_events) == 1 +# assert output_events[0].get('name', 'unset') == 'FS_WRITE' +# assert output_events[0].get('path') == '/tmp/test_crawl/index.json' + +# output_events = list(crawl_actor.process_next_event()) +# assert len(output_events) == 1 +# assert output_events[0].get('name', 'unset') == 'CRAWL_CREATED' + +# assert Crawl.objects.filter(seed_id=seed.id).exists(), 'Crawl was not created' +