mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2025-05-09 12:21:57 -04:00
166 lines
6.8 KiB
Python
166 lines
6.8 KiB
Python
# __package__ = 'archivebox.workers'
|
|
|
|
# import time
|
|
|
|
|
|
# from typing import ClassVar, Type, Iterable, TypedDict
|
|
# from django.db.models import QuerySet
|
|
# from django.db import transaction
|
|
# from django.utils import timezone
|
|
# from django.utils.functional import classproperty # type: ignore
|
|
|
|
# from .models import Event, Process, EventDict
|
|
|
|
|
|
# class ActorType:
|
|
# # static class attributes
|
|
# name: ClassVar[str]
|
|
# event_prefix: ClassVar[str]
|
|
# poll_interval: ClassVar[int] = 1
|
|
|
|
# @classproperty
|
|
# def event_queue(cls) -> QuerySet[Event]:
|
|
# return Event.objects.filter(type__startswith=cls.event_prefix)
|
|
|
|
# @classmethod
|
|
# def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
|
|
# cmd = ['archivebox', 'actor', cls.name]
|
|
# if exit_on_idle:
|
|
# cmd.append('--exit-on-idle')
|
|
# if wait_for_first_event:
|
|
# cmd.append('--wait-for-first-event')
|
|
# return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
|
|
|
|
# @classproperty
|
|
# def processes(cls) -> QuerySet[Process]:
|
|
# return Process.objects.filter(actor_type=cls.name)
|
|
|
|
# @classmethod
|
|
# def run(cls, wait_for_first_event=False, exit_on_idle=True):
|
|
|
|
# if wait_for_first_event:
|
|
# event = cls.event_queue.get_next_unclaimed()
|
|
# while not event:
|
|
# time.sleep(cls.poll_interval)
|
|
# event = cls.event_queue.get_next_unclaimed()
|
|
|
|
# while True:
|
|
# output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
|
|
# yield from output_events
|
|
# if not output_events:
|
|
# if exit_on_idle:
|
|
# break
|
|
# else:
|
|
# time.sleep(cls.poll_interval)
|
|
|
|
# @classmethod
|
|
# def process_next_event(cls) -> Iterable[EventDict]:
|
|
# event = cls.event_queue.get_next_unclaimed()
|
|
# output_events = []
|
|
|
|
# if not event:
|
|
# return []
|
|
|
|
# cls.mark_event_claimed(event, duration=60)
|
|
# try:
|
|
# for output_event in cls.receive(event):
|
|
# output_events.append(output_event)
|
|
# yield output_event
|
|
# cls.mark_event_succeeded(event, output_events=output_events)
|
|
# except BaseException as e:
|
|
# cls.mark_event_failed(event, output_events=output_events, error=e)
|
|
|
|
# @classmethod
|
|
# def process_idle_tick(cls) -> Iterable[EventDict]:
|
|
# # reset the idle event to be claimed by the current process
|
|
# event, _created = Event.objects.update_or_create(
|
|
# name=f'{cls.event_prefix}IDLE',
|
|
# emitted_by=Process.current(),
|
|
# defaults={
|
|
# 'deliver_at': timezone.now(),
|
|
# 'claimed_proc': None,
|
|
# 'claimed_at': None,
|
|
# 'finished_at': None,
|
|
# 'error': None,
|
|
# 'parent': None,
|
|
# },
|
|
# )
|
|
|
|
# # then process it like any other event
|
|
# yield from cls.process_next_event()
|
|
|
|
# @classmethod
|
|
# def receive(cls, event: Event) -> Iterable[EventDict]:
|
|
# handler_method = getattr(cls, f'on_{event.name}', None)
|
|
# if handler_method:
|
|
# yield from handler_method(event)
|
|
# else:
|
|
# raise Exception(f'No handler method for event: {event.name}')
|
|
|
|
# @staticmethod
|
|
# def on_IDLE() -> Iterable[EventDict]:
|
|
# return []
|
|
|
|
# @staticmethod
|
|
# def mark_event_claimed(event: Event, duration: int=60):
|
|
# proc = Process.current()
|
|
|
|
# with transaction.atomic():
|
|
# claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
|
|
# if not claimed:
|
|
# event.refresh_from_db()
|
|
# raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
|
|
|
|
# process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
|
|
|
|
# @staticmethod
|
|
# def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
|
|
# assert event.claimed_proc and (event.claimed_proc == Process.current())
|
|
# with transaction.atomic():
|
|
# updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
|
|
# if not updated:
|
|
# event.refresh_from_db()
|
|
# raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
|
|
|
|
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
|
|
|
|
# # dispatch any output events
|
|
# for output_event in output_events:
|
|
# Event.dispatch(event=output_event, parent=event)
|
|
|
|
# # trigger any callback events
|
|
# if event.on_success:
|
|
# Event.dispatch(event=event.on_success, parent=event)
|
|
|
|
# @staticmethod
|
|
# def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
|
|
# assert event.claimed_proc and (event.claimed_proc == Process.current())
|
|
# with transaction.atomic():
|
|
# updated = event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
|
|
# if not updated:
|
|
# event.refresh_from_db()
|
|
# raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
|
|
|
|
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
|
|
|
|
|
|
# # add dedicated error event to the output events
|
|
# output_events = [
|
|
# *output_events,
|
|
# {'name': f'{event.name}_ERROR', 'error': f'{type(error).__name__}: {error}'},
|
|
# ]
|
|
|
|
# # dispatch any output events
|
|
# for output_event in output_events:
|
|
# Event.dispatch(event=output_event, parent=event)
|
|
|
|
# # trigger any callback events
|
|
# if event.on_failure:
|
|
# Event.dispatch(event=event.on_failure, parent=event)
|
|
|