diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index 9750d554..2472f520 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -11,7 +11,7 @@ from rich import print from statemachine import State, StateMachine -from workers.actor import ActorType +# from workers.actor import ActorType from core.models import Snapshot, ArchiveResult @@ -107,19 +107,19 @@ class SnapshotMachine(StateMachine, strict_states=True): ) -class SnapshotWorker(ActorType[Snapshot]): - """ - The primary actor for progressing Snapshot objects - through their lifecycle using the SnapshotMachine. - """ - Model = Snapshot - StateMachineClass = SnapshotMachine +# class SnapshotWorker(ActorType[Snapshot]): +# """ +# The primary actor for progressing Snapshot objects +# through their lifecycle using the SnapshotMachine. +# """ +# Model = Snapshot +# StateMachineClass = SnapshotMachine - ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started' +# ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started' - MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 - MAX_TICK_TIME: ClassVar[int] = 10 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 +# MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 +# MAX_TICK_TIME: ClassVar[int] = 10 +# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 @@ -263,16 +263,16 @@ class ArchiveResultMachine(StateMachine, strict_states=True): self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes -class ArchiveResultWorker(ActorType[ArchiveResult]): - """ - The primary actor for progressing ArchiveResult objects - through their lifecycle using the ArchiveResultMachine. - """ - Model = ArchiveResult - StateMachineClass = ArchiveResultMachine +# class ArchiveResultWorker(ActorType[ArchiveResult]): +# """ +# The primary actor for progressing ArchiveResult objects +# through their lifecycle using the ArchiveResultMachine. +# """ +# Model = ArchiveResult +# StateMachineClass = ArchiveResultMachine - ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' +# ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' - MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 - MAX_TICK_TIME: ClassVar[int] = 60 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 +# MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 +# MAX_TICK_TIME: ClassVar[int] = 60 +# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index 4af5054c..4082c16a 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -9,7 +9,7 @@ from rich import print from statemachine import State, StateMachine -from workers.actor import ActorType +# from workers.actor import ActorType from crawls.models import Crawl @@ -96,17 +96,17 @@ class CrawlMachine(StateMachine, strict_states=True): ) -class CrawlWorker(ActorType[Crawl]): - """The Actor that manages the lifecycle of all Crawl objects""" +# class CrawlWorker(ActorType[Crawl]): +# """The Actor that manages the lifecycle of all Crawl objects""" - Model = Crawl - StateMachineClass = CrawlMachine +# Model = Crawl +# StateMachineClass = CrawlMachine - ACTIVE_STATE: ClassVar[State] = CrawlMachine.started - FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states - STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name +# ACTIVE_STATE: ClassVar[State] = CrawlMachine.started +# FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states +# STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name - MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 - MAX_TICK_TIME: ClassVar[int] = 10 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 +# MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 +# MAX_TICK_TIME: ClassVar[int] = 10 +# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/extractors/__init__.py b/archivebox/extractors/__init__.py index b1d4f513..bfa7319b 100644 --- a/archivebox/extractors/__init__.py +++ b/archivebox/extractors/__init__.py @@ -249,16 +249,16 @@ def get_extractors(dir: Path=EXTRACTORS_DIR) -> Dict[str, ExtractorModuleProtoco """iterate through archivebox/extractors/*.py and load extractor modules""" EXTRACTORS = {} - for filename in EXTRACTORS_DIR.glob('*.py'): - if filename.name.startswith('__'): - continue + # for filename in EXTRACTORS_DIR.glob('*.py'): + # if filename.name.startswith('__'): + # continue - extractor_name = filename.name.replace('.py', '') + # extractor_name = filename.name.replace('.py', '') - extractor_module = cast(ExtractorModuleProtocol, import_module(f'.{extractor_name}', package=__package__)) + # extractor_module = cast(ExtractorModuleProtocol, import_module(f'.{extractor_name}', package=__package__)) - assert getattr(extractor_module, 'get_output_path') - EXTRACTORS[extractor_name] = extractor_module + # # assert getattr(extractor_module, 'get_output_path') + # EXTRACTORS[extractor_name] = extractor_module return EXTRACTORS diff --git a/archivebox/extractors/extractor.py b/archivebox/extractors/extractor.py index d43325e4..67770878 100644 --- a/archivebox/extractors/extractor.py +++ b/archivebox/extractors/extractor.py @@ -16,205 +16,204 @@ from core.models import ArchiveResult import abx import archivebox -class Extractor: - # static class variables - name: ClassVar[str] = 'ytdlp' - verbose_name: ClassVar[str] = 'YT-DLP' - binaries: ClassVar[tuple[str, ...]] = () - daemons: ClassVar[tuple[str, ...]] = () - timeout: ClassVar[int] = 60 - - # instance variables - ARCHIVERESULT: ArchiveResult - CONFIG: dict[str, object] - BINARIES: dict[str, object] - DAEMONS: dict[str, object] - - def __init__(self, archiveresult: ArchiveResult, extra_config: dict | None=None): - assert archiveresult.pk, 'ArchiveResult must be saved to DB before it can be extracted' - self.archiveresult = self.ARCHIVERESULT = archiveresult - self.CONFIG = archivebox.pm.hook.get_SCOPE_CONFIG(archiveresult=self.archiveresult, extra=extra_config) - all_binaries = abx.as_dict(archivebox.pm.hook.get_BINARIES()) - all_daemons = abx.as_dict(archivebox.pm.hook.get_DAEMONS()) - self.BINARIES = { - binary_name: all_binaries[binary_name] - for binary_name in self.binaries - } - self.DAEMONS = { - daemon_name: all_daemons[daemon_name] - for daemon_name in self.daemons - } +# class Extractor: +# # static class variables +# name: ClassVar[str] = 'ytdlp' +# verbose_name: ClassVar[str] = 'YT-DLP' +# binaries: ClassVar[tuple[str, ...]] = () +# daemons: ClassVar[tuple[str, ...]] = () +# timeout: ClassVar[int] = 60 +# +# # instance variables +# ARCHIVERESULT: ArchiveResult +# CONFIG: dict[str, object] +# BINARIES: dict[str, object] +# DAEMONS: dict[str, object] +# +# def __init__(self, archiveresult: ArchiveResult, extra_config: dict | None=None): +# assert archiveresult.pk, 'ArchiveResult must be saved to DB before it can be extracted' +# self.archiveresult = self.ARCHIVERESULT = archiveresult +# self.CONFIG = archivebox.pm.hook.get_SCOPE_CONFIG(archiveresult=self.archiveresult, extra=extra_config) +# all_binaries = abx.as_dict(archivebox.pm.hook.get_BINARIES()) +# all_daemons = abx.as_dict(archivebox.pm.hook.get_DAEMONS()) +# self.BINARIES = { +# binary_name: all_binaries[binary_name] +# for binary_name in self.binaries +# } +# self.DAEMONS = { +# daemon_name: all_daemons[daemon_name] +# for daemon_name in self.daemons +# } - def extract(self, config: dict | None=None) -> 'ArchiveResult': - """ - - making sure any binaries the extractor depends on are installed and loaded - - creating a new temporary working directory under the snapshot dir to hold extractor output - - setting up a timer signal to kill the extractor if it runs too long - - passing the extractor the URLs, temporary working directory, and config dict of options - - running the extractor in a shell subprocess and collecting stdout/stderr - - capturing the extractor's exit code - - if extractor exits with 29 (RetryError), it should set the status to 'BACKOFF' and set retry_at to a datetime in the future - - if extractor exits with 50 (NotApplicable), it should set the status to 'SKIPPED', and set retry_at to None - - setting the correct permissions and ownership on all the output files - - generating the merkle tree of all the output files and their hashes - - generating a thumbnail of the main output (or collecting one provided by the extractor) - - detecting any special outputs files that need to be parsed for other parts of the system (content-types? ) - - metadata.json -> ArchiveResult.output_json - - outlinks.jsonl -> ArchiveResult.output_links - - search_texts.txt -> ArchiveResult.index_texts - - .merkle.json -> ArchiveResult.output_files - - videos.jsonl -> ArchiveResult.output_videos - - audios.jsonl -> ArchiveResult.output_audios - - images.jsonl -> ArchiveResult.output_images - - htmls.jsonl -> ArchiveResult.output_htmls - - saving all the result metadata to the ArchiveResult in the database - """ +# def extract(self, config: dict | None=None) -> 'ArchiveResult': +# """ +# - making sure any binaries the extractor depends on are installed and loaded +# - creating a new temporary working directory under the snapshot dir to hold extractor output +# - setting up a timer signal to kill the extractor if it runs too long +# - passing the extractor the URLs, temporary working directory, and config dict of options +# - running the extractor in a shell subprocess and collecting stdout/stderr +# - capturing the extractor's exit code +# - if extractor exits with 29 (RetryError), it should set the status to 'BACKOFF' and set retry_at to a datetime in the future +# - if extractor exits with 50 (NotApplicable), it should set the status to 'SKIPPED', and set retry_at to None +# - setting the correct permissions and ownership on all the output files +# - generating the merkle tree of all the output files and their hashes +# - generating a thumbnail of the main output (or collecting one provided by the extractor) +# - detecting any special outputs files that need to be parsed for other parts of the system (content-types? ) +# - metadata.json -> ArchiveResult.output_json +# - outlinks.jsonl -> ArchiveResult.output_links +# - search_texts.txt -> ArchiveResult.index_texts +# - .merkle.json -> ArchiveResult.output_files +# - videos.jsonl -> ArchiveResult.output_videos +# - audios.jsonl -> ArchiveResult.output_audios +# - images.jsonl -> ArchiveResult.output_images +# - htmls.jsonl -> ArchiveResult.output_htmls +# - saving all the result metadata to the ArchiveResult in the database +# """ - archiveresult = self.ARCHIVERESULT - # config = get_scope_config(archiveresult=archiveresult.snapshot.url, env=...) +# archiveresult = self.ARCHIVERESULT +# # config = get_scope_config(archiveresult=archiveresult.snapshot.url, env=...) - self.before_extract() +# self.before_extract() - error = Exception('Failed to start extractor') - stdout = '' - stderr = '' - try: - proc = archiveresult.EXTRACTOR.spawn(url=archiveresult.snapshot.url, binaries=binaries, daemons=daemons, cwd=cwd, config=config) - stdout, stderr = proc.communicate() - error = None - except Exception as err: - error = err - finally: - self.after_extract(error=error) +# error = Exception('Failed to start extractor') +# stdout = '' +# stderr = '' +# try: +# proc = archiveresult.EXTRACTOR.spawn(url=archiveresult.snapshot.url, binaries=binaries, daemons=daemons, cwd=cwd, config=config) +# stdout, stderr = proc.communicate() +# error = None +# except Exception as err: +# error = err +# finally: +# self.after_extract(error=error) - return archiveresult +# return archiveresult - def should_extract(self): - if self.archiveresult.snapshot.url.startswith('https://youtube.com/'): - return True - return False +# def should_extract(self): +# if self.archiveresult.snapshot.url.startswith('https://youtube.com/'): +# return True +# return False - def load_binaries(self): - return { - bin_name: binary.load() - for bin_name, binary in self.BINARIES.items() - } +# def load_binaries(self): +# return { +# bin_name: binary.load() +# for bin_name, binary in self.BINARIES.items() +# } - def load_daemons(self): - return { - daemon_name: daemon.load() - for daemon_name, daemon in self.DAEMONS.items() - } +# def load_daemons(self): +# return { +# daemon_name: daemon.load() +# for daemon_name, daemon in self.DAEMONS.items() +# } - def output_dir_name(self): - # e.g. 'ytdlp' - return f'{self.name}' +# def output_dir_name(self): +# # e.g. 'ytdlp' +# return f'{self.name}' - @property - def OUTPUT_DIR(self): - return self.archiveresult.snapshot_dir / self.output_dir_name() +# @property +# def OUTPUT_DIR(self): +# return self.archiveresult.snapshot_dir / self.output_dir_name() - def before_extract(self): - # create self.archiveresult.snapshot_dir / self.archiveresult.extractor / dir - # chown, chmod, etc. - binaries = self.load_binaries() - daemons = self.load_daemons() - cmd = self.archiveresult.EXTRACTOR.get_cmd(binaries=binaries, daemons=daemons) - cmd_version = self.archiveresult.EXTRACTOR.get_cmd_version(binaries=binaries, daemons=daemons) +# def before_extract(self): +# # create self.archiveresult.snapshot_dir / self.archiveresult.extractor / dir +# # chown, chmod, etc. +# binaries = self.load_binaries() +# daemons = self.load_daemons() +# cmd = self.archiveresult.EXTRACTOR.get_cmd(binaries=binaries, daemons=daemons) +# cmd_version = self.archiveresult.EXTRACTOR.get_cmd_version(binaries=binaries, daemons=daemons) - self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - os.chmod(self.OUTPUT_DIR, 0o755) - self.archiveresult.status = self.archiveresult.StatusChoices.STARTED - self.archiveresult.retry_at = timezone.now() + timedelta(seconds=self.timeout) - self.archiveresult.start_ts = timezone.now() - self.archiveresult.end_ts = None - self.archiveresult.output = None - self.archiveresult.output_path = str(self.OUTPUT_DIR.relative_to(self.archiveresult.snapshot_dir)) - self.archiveresult.cmd = cmd - self.archiveresult.cmd_version = cmd_version - self.archiveresult.machine = Machine.objects.get_current() - self.archiveresult.iface = NetworkInterface.objects.get_current() - self.archiveresult.save() - self.archiveresult.write_indexes() +# self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) +# os.chmod(self.OUTPUT_DIR, 0o755) +# self.archiveresult.status = self.archiveresult.StatusChoices.STARTED +# self.archiveresult.retry_at = timezone.now() + timedelta(seconds=self.timeout) +# self.archiveresult.start_ts = timezone.now() +# self.archiveresult.end_ts = None +# self.archiveresult.output = None +# self.archiveresult.output_path = str(self.OUTPUT_DIR.relative_to(self.archiveresult.snapshot_dir)) +# self.archiveresult.cmd = cmd +# self.archiveresult.cmd_version = cmd_version +# self.archiveresult.machine = Machine.objects.get_current() +# self.archiveresult.iface = NetworkInterface.objects.get_current() +# self.archiveresult.save() +# self.archiveresult.write_indexes() - def extract(self, url: str, binaries: dict, daemons: dict, cwd: Path, config: dict): - proc = subprocess.run(self.archiveresult.cmd, cwd=self.archiveresult.cwd, env=os.environ.update(binaries), timeout=self.timeout, shell=True, capture_output=True, text=True) - self.archiveresult.stdout = proc.stdout - self.archiveresult.stderr = proc.stderr - self.archiveresult.returncode = proc.returncode - self.archiveresult.save() - self.archiveresult.write_indexes() +# def extract(self, url: str, binaries: dict, daemons: dict, cwd: Path, config: dict): +# proc = subprocess.run(self.archiveresult.cmd, cwd=self.archiveresult.cwd, env=os.environ.update(binaries), timeout=self.timeout, shell=True, capture_output=True, text=True) +# self.archiveresult.stdout = proc.stdout +# self.archiveresult.stderr = proc.stderr +# self.archiveresult.returncode = proc.returncode +# self.archiveresult.save() +# self.archiveresult.write_indexes() - def determine_status(self): - if self.archiveresult.returncode == 29: - return self.archiveresult.StatusChoices.BACKOFF, timezone.now() + timedelta(seconds=self.timeout) - elif self.archiveresult.returncode == 50: - return self.archiveresult.StatusChoices.SKIPPED, None - else: - return self.archiveresult.StatusChoices.FAILED, None +# def determine_status(self): +# if self.archiveresult.returncode == 29: +# return self.archiveresult.StatusChoices.BACKOFF, timezone.now() + timedelta(seconds=self.timeout) +# elif self.archiveresult.returncode == 50: +# return self.archiveresult.StatusChoices.SKIPPED, None +# else: +# return self.archiveresult.StatusChoices.FAILED, None - def collect_outputs(self, cwd: Path): - for file in cwd.rglob('*'): - path = file.relative_to(cwd) - os.chmod(file, 0o644) - #os.chown(file, ARCHIVEBOX_UID, ARCHIVEBOX_GID) +# def collect_outputs(self, cwd: Path): +# for file in cwd.rglob('*'): +# path = file.relative_to(cwd) +# os.chmod(file, 0o644) +# #os.chown(file, ARCHIVEBOX_UID, ARCHIVEBOX_GID) - self.archiveresult.outputs.append({ - 'type': 'FILE', - 'path': file.relative_to(cwd), - 'size': file.stat().st_size, - 'ext': file.suffix, - 'mimetype': mimetypes.guess_type(file)[0], - 'sha256': hashlib.sha256(file.read_bytes()).hexdigest(), - 'blake3': hashlib.blake3(file.read_bytes()).hexdigest(), - 'created_at': file.stat().st_ctime, - 'modified_at': file.stat().st_mtime, - 'symlinks': [ - 'screenshot.png', - 'example.com', - ] - }) - outlinks = parse_outlinks(file) - if outlinks: - self.archiveresult.outputs.append({ - 'type': 'OUTLINK', - 'url': outlink.target, - 'selector': outlink.selector, - 'text': outlink.text, - }) - - if path.endswith('favicon.ico'): - self.archiveresult.outputs.append({ - 'type': 'FAVICON', - 'symlinks': { - 'favicon': output_file['path'], - 'favicon.ico': output_file['path'], - 'favicon.png': output_file['path'].with_suffix('.png'), - }, - 'path': output_file['path'], - }) - if path.endswith('.pdf'): - self.archiveresult.outputs.append({ - 'type': 'PDF', - 'path': file.relative_to(cwd), - }) - - if 'text/plain' in mimetypes.guess_type(file): - self.archiveresult.outputs.append({ - 'type': 'SEARCHTEXT', - 'path': file.relative_to(self.archiveresult.OUTPUT_DIR), - 'archiveresult_id': self.archiveresult.id, - }) - - def after_extract(self, error: Exception | None=None): - status, retry_at = self.determine_status() - - - self.archiveresult.error = f'{type(error).__name__}: {error}' if error else None - self.archiveresult.status = self.archiveresult.StatusChoices.FAILED if error else self.archiveresult.StatusChoices.SUCCEEDED - self.archiveresult.retry_at = None - self.archiveresult.end_ts = timezone.now() - self.archiveresult.output = self.archiveresult.outputs[0].path - self.archiveresult.save() - self.archiveresult.write_indexes() +# self.archiveresult.outputs.append({ +# 'type': 'FILE', +# 'path': file.relative_to(cwd), +# 'size': file.stat().st_size, +# 'ext': file.suffix, +# 'mimetype': mimetypes.guess_type(file)[0], +# 'sha256': hashlib.sha256(file.read_bytes()).hexdigest(), +# 'blake3': hashlib.blake3(file.read_bytes()).hexdigest(), +# 'created_at': file.stat().st_ctime, +# 'modified_at': file.stat().st_mtime, +# 'symlinks': [ +# 'screenshot.png', +# 'example.com', +# ] +# }) +# outlinks = parse_outlinks(file) +# if outlinks: +# self.archiveresult.outputs.append({ +# 'type': 'OUTLINK', +# 'url': outlink.target, +# 'selector': outlink.selector, +# 'text': outlink.text, +# }) +# +# if path.endswith('favicon.ico'): +# self.archiveresult.outputs.append({ +# 'type': 'FAVICON', +# 'symlinks': { +# 'favicon': output_file['path'], +# 'favicon.ico': output_file['path'], +# 'favicon.png': output_file['path'].with_suffix('.png'), +# }, +# 'path': output_file['path'], +# }) +# if path.endswith('.pdf'): +# self.archiveresult.outputs.append({ +# 'type': 'PDF', +# 'path': file.relative_to(cwd), +# }) +# +# if 'text/plain' in mimetypes.guess_type(file): +# self.archiveresult.outputs.append({ +# 'type': 'SEARCHTEXT', +# 'path': file.relative_to(self.archiveresult.OUTPUT_DIR), +# 'archiveresult_id': self.archiveresult.id, +# }) +# +# def after_extract(self, error: Exception | None=None): +# status, retry_at = self.determine_status() +# +# self.archiveresult.error = f'{type(error).__name__}: {error}' if error else None +# self.archiveresult.status = self.archiveresult.StatusChoices.FAILED if error else self.archiveresult.StatusChoices.SUCCEEDED +# self.archiveresult.retry_at = None +# self.archiveresult.end_ts = timezone.now() +# self.archiveresult.output = self.archiveresult.outputs[0].path +# self.archiveresult.save() +# self.archiveresult.write_indexes() \ No newline at end of file diff --git a/archivebox/workers/models.py b/archivebox/workers/models.py index 84026005..2777bd39 100644 --- a/archivebox/workers/models.py +++ b/archivebox/workers/models.py @@ -1,15 +1,23 @@ -from typing import ClassVar, Type, Iterable -from datetime import datetime, timedelta +__package__ = 'archivebox.workers' +import uuid +import json + +from typing import ClassVar, Type, Iterable, TypedDict +from datetime import datetime, timedelta from statemachine.mixins import MachineMixin from django.db import models +from django.db.models import QuerySet +from django.core import checks from django.utils import timezone from django.utils.functional import classproperty +from base_models.models import ABIDModel, ABIDField +from machine.models import Process + from statemachine import registry, StateMachine, State -from django.core import checks class DefaultStatusChoices(models.TextChoices): QUEUED = 'queued', 'Queued' @@ -298,3 +306,235 @@ class ModelWithStateMachine(BaseModelWithStateMachine): class Meta: abstract = True + + + + + +class EventDict(TypedDict, total=False): + name: str + + id: str | uuid.UUID + path: str + content: str + status: str + retry_at: datetime | None + url: str + seed_id: str | uuid.UUID + crawl_id: str | uuid.UUID + snapshot_id: str | uuid.UUID + process_id: str | uuid.UUID + extractor: str + error: str + on_success: dict | None + on_failure: dict | None + +class EventManager(models.Manager): + pass + +class EventQuerySet(models.QuerySet): + def get_next_unclaimed(self) -> 'Event | None': + return self.filter(claimed_at=None).order_by('deliver_at').first() + + def expired(self, older_than: int=60 * 10) -> QuerySet['Event']: + return self.filter(claimed_at__lt=timezone.now() - timedelta(seconds=older_than)) + + +class Event(ABIDModel): + abid_prefix = 'evn_' + abid_ts_src = 'self.deliver_at' # e.g. 'self.created_at' + abid_uri_src = 'self.name' # e.g. 'self.uri' (MUST BE SET) + abid_subtype_src = 'self.emitted_by' # e.g. 'self.extractor' + abid_rand_src = 'self.id' # e.g. 'self.uuid' or 'self.id' + abid_drift_allowed: bool = False # set to True to allow abid_field values to change after a fixed ABID has been issued (NOT RECOMMENDED: means values can drift out of sync from original ABID) + + read_only_fields = ('id', 'deliver_at', 'name', 'kwargs', 'timeout', 'parent', 'emitted_by', 'on_success', 'on_failure') + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, null=False, editable=False, unique=True, verbose_name='ID') + + # disable these fields from inherited models, they're not needed / take up too much room + abid = None + created_at = None + created_by = None + created_by_id = None + + # immutable fields + deliver_at = models.DateTimeField(default=timezone.now, null=False, editable=False, unique=True, db_index=True) + name = models.CharField(max_length=255, null=False, blank=False, db_index=True) + kwargs = models.JSONField(default=dict) + timeout = models.IntegerField(null=False, default=60) + parent = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL, related_name='child_events') + emitted_by = models.ForeignKey(Process, null=False, on_delete=models.PROTECT, related_name='emitted_events') + on_success = models.JSONField(null=True) + on_failure = models.JSONField(null=True) + + # mutable fields + modified_at = models.DateTimeField(auto_now=True) + claimed_proc = models.ForeignKey(Process, null=True, on_delete=models.CASCADE, related_name='claimed_events') + claimed_at = models.DateTimeField(null=True) + finished_at = models.DateTimeField(null=True) + error = models.TextField(null=True) + + objects: EventManager = EventManager.from_queryset(EventQuerySet)() + + child_events: models.RelatedManager['Event'] + + @classmethod + def get_next_timestamp(cls): + """Get the next monotonically increasing timestamp for the next event.dispatch_at""" + latest_event = cls.objects.order_by('-deliver_at').first() + ts = timezone.now() + if latest_event: + assert ts > latest_event.deliver_at, f'Event.deliver_at is not monotonically increasing: {latest_event.deliver_at} > {ts}' + return ts + + @classmethod + def dispatch(cls, name: str | EventDict | None = None, event: EventDict | None = None, **event_init_kwargs) -> 'Event': + """ + Create a new Event and save it to the database. + + Can be called as either: + >>> Event.dispatch(name, {**kwargs}, **event_init_kwargs) + # OR + >>> Event.dispatch({name, **kwargs}, **event_init_kwargs) + """ + event_kwargs: EventDict = event or {} + if isinstance(name, dict): + event_kwargs.update(name) + assert isinstance(event_kwargs, dict), 'must be called as Event.dispatch(name, {**kwargs}) or Event.dispatch({name, **kwargs})' + + event_name: str = name if (isinstance(name, str) and name) else event_kwargs.pop('name') + + new_event = cls( + name=event_name, + kwargs=event_kwargs, + emitted_by=Process.current(), + **event_init_kwargs, + ) + new_event.save() + return new_event + + def clean(self, *args, **kwargs) -> None: + """Fill and validate all the event fields""" + + # check uuid and deliver_at are set + assert self.id, 'Event.id must be set to a valid v4 UUID' + if not self.deliver_at: + self.deliver_at = self.get_next_timestamp() + assert self.deliver_at and (datetime(2024, 12, 8, 12, 0, 0, tzinfo=timezone.utc) < self.deliver_at < datetime(2100, 12, 31, 23, 59, 0, tzinfo=timezone.utc)), ( + f'Event.deliver_at must be set to a valid UTC datetime (got Event.deliver_at = {self.deliver_at})') + + # if name is not set but it's found in the kwargs, move it out of the kwargs to the name field + if 'type' in self.kwargs and ((self.name == self.kwargs['type']) or not self.name): + self.name = self.kwargs.pop('type') + if 'name' in self.kwargs and ((self.name == self.kwargs['name']) or not self.name): + self.name = self.kwargs.pop('name') + + # check name is set and is a valid identifier + assert isinstance(self.name, str) and len(self.name) > 3, 'Event.name must be set to a non-empty string' + assert self.name.isidentifier(), f'Event.name must be a valid identifier (got Event.name = {self.name})' + assert self.name.isupper(), f'Event.name must be in uppercase (got Event.name = {self.name})' + + # check that kwargs keys and values are valid + for key, value in self.kwargs.items(): + assert isinstance(key, str), f'Event kwargs keys can only be strings (got Event.kwargs[{key}: {type(key).__name__}])' + assert key not in self._meta.get_fields(), f'Event.kwargs cannot contain "{key}" key (Event.kwargs[{key}] conflicts with with reserved attr Event.{key} = {getattr(self, key)})' + assert json.dumps(value, sort_keys=True), f'Event can only contain JSON serializable values (got Event.kwargs[{key}]: {type(value).__name__} = {value})' + + # validate on_success and on_failure are valid event dicts if set + if self.on_success: + assert isinstance(self.on_success, dict) and self.on_success.get('name', '!invalid').isidentifier(), f'Event.on_success must be a valid event dict (got {self.on_success})' + if self.on_failure: + assert isinstance(self.on_failure, dict) and self.on_failure.get('name', '!invalid').isidentifier(), f'Event.on_failure must be a valid event dict (got {self.on_failure})' + + # validate mutable fields like claimed_at, claimed_proc, finished_at are set correctly + if self.claimed_at: + assert self.claimed_proc, f'Event.claimed_at and Event.claimed_proc must be set together (only found Event.claimed_at = {self.claimed_at})' + if self.claimed_proc: + assert self.claimed_at, f'Event.claimed_at and Event.claimed_proc must be set together (only found Event.claimed_proc = {self.claimed_proc})' + if self.finished_at: + assert self.claimed_at, f'If Event.finished_at is set, Event.claimed_at and Event.claimed_proc must also be set (Event.claimed_proc = {self.claimed_proc} and Event.claimed_at = {self.claimed_at})' + + # validate error is a non-empty string or None + if isinstance(self.error, BaseException): + self.error = f'{type(self.error).__name__}: {self.error}' + if self.error: + assert isinstance(self.error, str) and str(self.error).strip(), f'Event.error must be a non-empty string (got Event.error: {type(self.error).__name__} = {self.error})' + else: + assert self.error is None, f'Event.error must be None or a non-empty string (got Event.error: {type(self.error).__name__} = {self.error})' + + + def save(self, *args, **kwargs): + self.clean() + return super().save(*args, **kwargs) + + def reset(self): + """Force-update an event to a pending/unclaimed state (without running any of its handlers or callbacks)""" + self.claimed_proc = None + self.claimed_at = None + self.finished_at = None + self.error = None + self.save() + + def abort(self): + """Force-update an event to a completed/failed state (without running any of its handlers or callbacks)""" + self.claimed_proc = Process.current() + self.claimed_at = timezone.now() + self.finished_at = timezone.now() + self.error = 'Aborted' + self.save() + + + def __repr__(self) -> str: + label = f'[{self.name} {self.kwargs}]' + if self.is_finished: + label += f' ✅' + elif self.claimed_proc: + label += f' 🏃' + return label + + def __str__(self) -> str: + return repr(self) + + @property + def type(self) -> str: + return self.name + + @property + def is_queued(self): + return not self.is_claimed and not self.is_finished + + @property + def is_claimed(self): + return self.claimed_at is not None + + @property + def is_expired(self): + if not self.claimed_at: + return False + + elapsed_time = timezone.now() - self.claimed_at + return elapsed_time > timedelta(seconds=self.timeout) + + @property + def is_processing(self): + return self.is_claimed and not self.is_finished + + @property + def is_finished(self): + return self.finished_at is not None + + @property + def is_failed(self): + return self.is_finished and bool(self.error) + + @property + def is_succeeded(self): + return self.is_finished and not bool(self.error) + + def __getattr__(self, key: str): + """ + Allow access to the event kwargs as attributes e.g. + Event(name='CRAWL_CREATE', kwargs={'some_key': 'some_val'}).some_key -> 'some_val' + """ + return self.kwargs.get(key)