add new Event model to workers/models

This commit is contained in:
Nick Sweeting 2024-12-12 21:40:57 -08:00
parent 651ba0b11c
commit 5c06b8ff00
No known key found for this signature in database
5 changed files with 468 additions and 229 deletions

View file

@ -11,7 +11,7 @@ from rich import print
from statemachine import State, StateMachine
from workers.actor import ActorType
# from workers.actor import ActorType
from core.models import Snapshot, ArchiveResult
@ -107,19 +107,19 @@ class SnapshotMachine(StateMachine, strict_states=True):
)
class SnapshotWorker(ActorType[Snapshot]):
"""
The primary actor for progressing Snapshot objects
through their lifecycle using the SnapshotMachine.
"""
Model = Snapshot
StateMachineClass = SnapshotMachine
# class SnapshotWorker(ActorType[Snapshot]):
# """
# The primary actor for progressing Snapshot objects
# through their lifecycle using the SnapshotMachine.
# """
# Model = Snapshot
# StateMachineClass = SnapshotMachine
ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
# ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
MAX_TICK_TIME: ClassVar[int] = 10
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
# MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
# MAX_TICK_TIME: ClassVar[int] = 10
# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
@ -263,16 +263,16 @@ class ArchiveResultMachine(StateMachine, strict_states=True):
self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes
class ArchiveResultWorker(ActorType[ArchiveResult]):
"""
The primary actor for progressing ArchiveResult objects
through their lifecycle using the ArchiveResultMachine.
"""
Model = ArchiveResult
StateMachineClass = ArchiveResultMachine
# class ArchiveResultWorker(ActorType[ArchiveResult]):
# """
# The primary actor for progressing ArchiveResult objects
# through their lifecycle using the ArchiveResultMachine.
# """
# Model = ArchiveResult
# StateMachineClass = ArchiveResultMachine
ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
# ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
MAX_TICK_TIME: ClassVar[int] = 60
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
# MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
# MAX_TICK_TIME: ClassVar[int] = 60
# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10

View file

@ -9,7 +9,7 @@ from rich import print
from statemachine import State, StateMachine
from workers.actor import ActorType
# from workers.actor import ActorType
from crawls.models import Crawl
@ -96,17 +96,17 @@ class CrawlMachine(StateMachine, strict_states=True):
)
class CrawlWorker(ActorType[Crawl]):
"""The Actor that manages the lifecycle of all Crawl objects"""
# class CrawlWorker(ActorType[Crawl]):
# """The Actor that manages the lifecycle of all Crawl objects"""
Model = Crawl
StateMachineClass = CrawlMachine
# Model = Crawl
# StateMachineClass = CrawlMachine
ACTIVE_STATE: ClassVar[State] = CrawlMachine.started
FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states
STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name
# ACTIVE_STATE: ClassVar[State] = CrawlMachine.started
# FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states
# STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
MAX_TICK_TIME: ClassVar[int] = 10
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
# MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
# MAX_TICK_TIME: ClassVar[int] = 10
# CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10

View file

@ -249,16 +249,16 @@ def get_extractors(dir: Path=EXTRACTORS_DIR) -> Dict[str, ExtractorModuleProtoco
"""iterate through archivebox/extractors/*.py and load extractor modules"""
EXTRACTORS = {}
for filename in EXTRACTORS_DIR.glob('*.py'):
if filename.name.startswith('__'):
continue
# for filename in EXTRACTORS_DIR.glob('*.py'):
# if filename.name.startswith('__'):
# continue
extractor_name = filename.name.replace('.py', '')
# extractor_name = filename.name.replace('.py', '')
extractor_module = cast(ExtractorModuleProtocol, import_module(f'.{extractor_name}', package=__package__))
# extractor_module = cast(ExtractorModuleProtocol, import_module(f'.{extractor_name}', package=__package__))
assert getattr(extractor_module, 'get_output_path')
EXTRACTORS[extractor_name] = extractor_module
# # assert getattr(extractor_module, 'get_output_path')
# EXTRACTORS[extractor_name] = extractor_module
return EXTRACTORS

View file

@ -16,205 +16,204 @@ from core.models import ArchiveResult
import abx
import archivebox
class Extractor:
# static class variables
name: ClassVar[str] = 'ytdlp'
verbose_name: ClassVar[str] = 'YT-DLP'
binaries: ClassVar[tuple[str, ...]] = ()
daemons: ClassVar[tuple[str, ...]] = ()
timeout: ClassVar[int] = 60
# class Extractor:
# # static class variables
# name: ClassVar[str] = 'ytdlp'
# verbose_name: ClassVar[str] = 'YT-DLP'
# binaries: ClassVar[tuple[str, ...]] = ()
# daemons: ClassVar[tuple[str, ...]] = ()
# timeout: ClassVar[int] = 60
#
# # instance variables
# ARCHIVERESULT: ArchiveResult
# CONFIG: dict[str, object]
# BINARIES: dict[str, object]
# DAEMONS: dict[str, object]
#
# def __init__(self, archiveresult: ArchiveResult, extra_config: dict | None=None):
# assert archiveresult.pk, 'ArchiveResult must be saved to DB before it can be extracted'
# self.archiveresult = self.ARCHIVERESULT = archiveresult
# self.CONFIG = archivebox.pm.hook.get_SCOPE_CONFIG(archiveresult=self.archiveresult, extra=extra_config)
# all_binaries = abx.as_dict(archivebox.pm.hook.get_BINARIES())
# all_daemons = abx.as_dict(archivebox.pm.hook.get_DAEMONS())
# self.BINARIES = {
# binary_name: all_binaries[binary_name]
# for binary_name in self.binaries
# }
# self.DAEMONS = {
# daemon_name: all_daemons[daemon_name]
# for daemon_name in self.daemons
# }
# instance variables
ARCHIVERESULT: ArchiveResult
CONFIG: dict[str, object]
BINARIES: dict[str, object]
DAEMONS: dict[str, object]
# def extract(self, config: dict | None=None) -> 'ArchiveResult':
# """
# - making sure any binaries the extractor depends on are installed and loaded
# - creating a new temporary working directory under the snapshot dir to hold extractor output
# - setting up a timer signal to kill the extractor if it runs too long
# - passing the extractor the URLs, temporary working directory, and config dict of options
# - running the extractor in a shell subprocess and collecting stdout/stderr
# - capturing the extractor's exit code
# - if extractor exits with 29 (RetryError), it should set the status to 'BACKOFF' and set retry_at to a datetime in the future
# - if extractor exits with 50 (NotApplicable), it should set the status to 'SKIPPED', and set retry_at to None
# - setting the correct permissions and ownership on all the output files
# - generating the merkle tree of all the output files and their hashes
# - generating a thumbnail of the main output (or collecting one provided by the extractor)
# - detecting any special outputs files that need to be parsed for other parts of the system (content-types? )
# - metadata.json -> ArchiveResult.output_json
# - outlinks.jsonl -> ArchiveResult.output_links
# - search_texts.txt -> ArchiveResult.index_texts
# - .merkle.json -> ArchiveResult.output_files
# - videos.jsonl -> ArchiveResult.output_videos
# - audios.jsonl -> ArchiveResult.output_audios
# - images.jsonl -> ArchiveResult.output_images
# - htmls.jsonl -> ArchiveResult.output_htmls
# - saving all the result metadata to the ArchiveResult in the database
# """
def __init__(self, archiveresult: ArchiveResult, extra_config: dict | None=None):
assert archiveresult.pk, 'ArchiveResult must be saved to DB before it can be extracted'
self.archiveresult = self.ARCHIVERESULT = archiveresult
self.CONFIG = archivebox.pm.hook.get_SCOPE_CONFIG(archiveresult=self.archiveresult, extra=extra_config)
all_binaries = abx.as_dict(archivebox.pm.hook.get_BINARIES())
all_daemons = abx.as_dict(archivebox.pm.hook.get_DAEMONS())
self.BINARIES = {
binary_name: all_binaries[binary_name]
for binary_name in self.binaries
}
self.DAEMONS = {
daemon_name: all_daemons[daemon_name]
for daemon_name in self.daemons
}
# archiveresult = self.ARCHIVERESULT
# # config = get_scope_config(archiveresult=archiveresult.snapshot.url, env=...)
def extract(self, config: dict | None=None) -> 'ArchiveResult':
"""
- making sure any binaries the extractor depends on are installed and loaded
- creating a new temporary working directory under the snapshot dir to hold extractor output
- setting up a timer signal to kill the extractor if it runs too long
- passing the extractor the URLs, temporary working directory, and config dict of options
- running the extractor in a shell subprocess and collecting stdout/stderr
- capturing the extractor's exit code
- if extractor exits with 29 (RetryError), it should set the status to 'BACKOFF' and set retry_at to a datetime in the future
- if extractor exits with 50 (NotApplicable), it should set the status to 'SKIPPED', and set retry_at to None
- setting the correct permissions and ownership on all the output files
- generating the merkle tree of all the output files and their hashes
- generating a thumbnail of the main output (or collecting one provided by the extractor)
- detecting any special outputs files that need to be parsed for other parts of the system (content-types? )
- metadata.json -> ArchiveResult.output_json
- outlinks.jsonl -> ArchiveResult.output_links
- search_texts.txt -> ArchiveResult.index_texts
- .merkle.json -> ArchiveResult.output_files
- videos.jsonl -> ArchiveResult.output_videos
- audios.jsonl -> ArchiveResult.output_audios
- images.jsonl -> ArchiveResult.output_images
- htmls.jsonl -> ArchiveResult.output_htmls
- saving all the result metadata to the ArchiveResult in the database
"""
# self.before_extract()
archiveresult = self.ARCHIVERESULT
# config = get_scope_config(archiveresult=archiveresult.snapshot.url, env=...)
# error = Exception('Failed to start extractor')
# stdout = ''
# stderr = ''
# try:
# proc = archiveresult.EXTRACTOR.spawn(url=archiveresult.snapshot.url, binaries=binaries, daemons=daemons, cwd=cwd, config=config)
# stdout, stderr = proc.communicate()
# error = None
# except Exception as err:
# error = err
# finally:
# self.after_extract(error=error)
self.before_extract()
# return archiveresult
error = Exception('Failed to start extractor')
stdout = ''
stderr = ''
try:
proc = archiveresult.EXTRACTOR.spawn(url=archiveresult.snapshot.url, binaries=binaries, daemons=daemons, cwd=cwd, config=config)
stdout, stderr = proc.communicate()
error = None
except Exception as err:
error = err
finally:
self.after_extract(error=error)
# def should_extract(self):
# if self.archiveresult.snapshot.url.startswith('https://youtube.com/'):
# return True
# return False
return archiveresult
# def load_binaries(self):
# return {
# bin_name: binary.load()
# for bin_name, binary in self.BINARIES.items()
# }
def should_extract(self):
if self.archiveresult.snapshot.url.startswith('https://youtube.com/'):
return True
return False
# def load_daemons(self):
# return {
# daemon_name: daemon.load()
# for daemon_name, daemon in self.DAEMONS.items()
# }
def load_binaries(self):
return {
bin_name: binary.load()
for bin_name, binary in self.BINARIES.items()
}
# def output_dir_name(self):
# # e.g. 'ytdlp'
# return f'{self.name}'
def load_daemons(self):
return {
daemon_name: daemon.load()
for daemon_name, daemon in self.DAEMONS.items()
}
# @property
# def OUTPUT_DIR(self):
# return self.archiveresult.snapshot_dir / self.output_dir_name()
def output_dir_name(self):
# e.g. 'ytdlp'
return f'{self.name}'
# def before_extract(self):
# # create self.archiveresult.snapshot_dir / self.archiveresult.extractor / dir
# # chown, chmod, etc.
# binaries = self.load_binaries()
# daemons = self.load_daemons()
# cmd = self.archiveresult.EXTRACTOR.get_cmd(binaries=binaries, daemons=daemons)
# cmd_version = self.archiveresult.EXTRACTOR.get_cmd_version(binaries=binaries, daemons=daemons)
@property
def OUTPUT_DIR(self):
return self.archiveresult.snapshot_dir / self.output_dir_name()
# self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# os.chmod(self.OUTPUT_DIR, 0o755)
# self.archiveresult.status = self.archiveresult.StatusChoices.STARTED
# self.archiveresult.retry_at = timezone.now() + timedelta(seconds=self.timeout)
# self.archiveresult.start_ts = timezone.now()
# self.archiveresult.end_ts = None
# self.archiveresult.output = None
# self.archiveresult.output_path = str(self.OUTPUT_DIR.relative_to(self.archiveresult.snapshot_dir))
# self.archiveresult.cmd = cmd
# self.archiveresult.cmd_version = cmd_version
# self.archiveresult.machine = Machine.objects.get_current()
# self.archiveresult.iface = NetworkInterface.objects.get_current()
# self.archiveresult.save()
# self.archiveresult.write_indexes()
def before_extract(self):
# create self.archiveresult.snapshot_dir / self.archiveresult.extractor / dir
# chown, chmod, etc.
binaries = self.load_binaries()
daemons = self.load_daemons()
cmd = self.archiveresult.EXTRACTOR.get_cmd(binaries=binaries, daemons=daemons)
cmd_version = self.archiveresult.EXTRACTOR.get_cmd_version(binaries=binaries, daemons=daemons)
# def extract(self, url: str, binaries: dict, daemons: dict, cwd: Path, config: dict):
# proc = subprocess.run(self.archiveresult.cmd, cwd=self.archiveresult.cwd, env=os.environ.update(binaries), timeout=self.timeout, shell=True, capture_output=True, text=True)
# self.archiveresult.stdout = proc.stdout
# self.archiveresult.stderr = proc.stderr
# self.archiveresult.returncode = proc.returncode
# self.archiveresult.save()
# self.archiveresult.write_indexes()
self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
os.chmod(self.OUTPUT_DIR, 0o755)
self.archiveresult.status = self.archiveresult.StatusChoices.STARTED
self.archiveresult.retry_at = timezone.now() + timedelta(seconds=self.timeout)
self.archiveresult.start_ts = timezone.now()
self.archiveresult.end_ts = None
self.archiveresult.output = None
self.archiveresult.output_path = str(self.OUTPUT_DIR.relative_to(self.archiveresult.snapshot_dir))
self.archiveresult.cmd = cmd
self.archiveresult.cmd_version = cmd_version
self.archiveresult.machine = Machine.objects.get_current()
self.archiveresult.iface = NetworkInterface.objects.get_current()
self.archiveresult.save()
self.archiveresult.write_indexes()
# def determine_status(self):
# if self.archiveresult.returncode == 29:
# return self.archiveresult.StatusChoices.BACKOFF, timezone.now() + timedelta(seconds=self.timeout)
# elif self.archiveresult.returncode == 50:
# return self.archiveresult.StatusChoices.SKIPPED, None
# else:
# return self.archiveresult.StatusChoices.FAILED, None
def extract(self, url: str, binaries: dict, daemons: dict, cwd: Path, config: dict):
proc = subprocess.run(self.archiveresult.cmd, cwd=self.archiveresult.cwd, env=os.environ.update(binaries), timeout=self.timeout, shell=True, capture_output=True, text=True)
self.archiveresult.stdout = proc.stdout
self.archiveresult.stderr = proc.stderr
self.archiveresult.returncode = proc.returncode
self.archiveresult.save()
self.archiveresult.write_indexes()
# def collect_outputs(self, cwd: Path):
# for file in cwd.rglob('*'):
# path = file.relative_to(cwd)
# os.chmod(file, 0o644)
# #os.chown(file, ARCHIVEBOX_UID, ARCHIVEBOX_GID)
def determine_status(self):
if self.archiveresult.returncode == 29:
return self.archiveresult.StatusChoices.BACKOFF, timezone.now() + timedelta(seconds=self.timeout)
elif self.archiveresult.returncode == 50:
return self.archiveresult.StatusChoices.SKIPPED, None
else:
return self.archiveresult.StatusChoices.FAILED, None
def collect_outputs(self, cwd: Path):
for file in cwd.rglob('*'):
path = file.relative_to(cwd)
os.chmod(file, 0o644)
#os.chown(file, ARCHIVEBOX_UID, ARCHIVEBOX_GID)
self.archiveresult.outputs.append({
'type': 'FILE',
'path': file.relative_to(cwd),
'size': file.stat().st_size,
'ext': file.suffix,
'mimetype': mimetypes.guess_type(file)[0],
'sha256': hashlib.sha256(file.read_bytes()).hexdigest(),
'blake3': hashlib.blake3(file.read_bytes()).hexdigest(),
'created_at': file.stat().st_ctime,
'modified_at': file.stat().st_mtime,
'symlinks': [
'screenshot.png',
'example.com',
]
})
outlinks = parse_outlinks(file)
if outlinks:
self.archiveresult.outputs.append({
'type': 'OUTLINK',
'url': outlink.target,
'selector': outlink.selector,
'text': outlink.text,
})
if path.endswith('favicon.ico'):
self.archiveresult.outputs.append({
'type': 'FAVICON',
'symlinks': {
'favicon': output_file['path'],
'favicon.ico': output_file['path'],
'favicon.png': output_file['path'].with_suffix('.png'),
},
'path': output_file['path'],
})
if path.endswith('.pdf'):
self.archiveresult.outputs.append({
'type': 'PDF',
'path': file.relative_to(cwd),
})
if 'text/plain' in mimetypes.guess_type(file):
self.archiveresult.outputs.append({
'type': 'SEARCHTEXT',
'path': file.relative_to(self.archiveresult.OUTPUT_DIR),
'archiveresult_id': self.archiveresult.id,
})
def after_extract(self, error: Exception | None=None):
status, retry_at = self.determine_status()
self.archiveresult.error = f'{type(error).__name__}: {error}' if error else None
self.archiveresult.status = self.archiveresult.StatusChoices.FAILED if error else self.archiveresult.StatusChoices.SUCCEEDED
self.archiveresult.retry_at = None
self.archiveresult.end_ts = timezone.now()
self.archiveresult.output = self.archiveresult.outputs[0].path
self.archiveresult.save()
self.archiveresult.write_indexes()
# self.archiveresult.outputs.append({
# 'type': 'FILE',
# 'path': file.relative_to(cwd),
# 'size': file.stat().st_size,
# 'ext': file.suffix,
# 'mimetype': mimetypes.guess_type(file)[0],
# 'sha256': hashlib.sha256(file.read_bytes()).hexdigest(),
# 'blake3': hashlib.blake3(file.read_bytes()).hexdigest(),
# 'created_at': file.stat().st_ctime,
# 'modified_at': file.stat().st_mtime,
# 'symlinks': [
# 'screenshot.png',
# 'example.com',
# ]
# })
# outlinks = parse_outlinks(file)
# if outlinks:
# self.archiveresult.outputs.append({
# 'type': 'OUTLINK',
# 'url': outlink.target,
# 'selector': outlink.selector,
# 'text': outlink.text,
# })
#
# if path.endswith('favicon.ico'):
# self.archiveresult.outputs.append({
# 'type': 'FAVICON',
# 'symlinks': {
# 'favicon': output_file['path'],
# 'favicon.ico': output_file['path'],
# 'favicon.png': output_file['path'].with_suffix('.png'),
# },
# 'path': output_file['path'],
# })
# if path.endswith('.pdf'):
# self.archiveresult.outputs.append({
# 'type': 'PDF',
# 'path': file.relative_to(cwd),
# })
#
# if 'text/plain' in mimetypes.guess_type(file):
# self.archiveresult.outputs.append({
# 'type': 'SEARCHTEXT',
# 'path': file.relative_to(self.archiveresult.OUTPUT_DIR),
# 'archiveresult_id': self.archiveresult.id,
# })
#
# def after_extract(self, error: Exception | None=None):
# status, retry_at = self.determine_status()
#
# self.archiveresult.error = f'{type(error).__name__}: {error}' if error else None
# self.archiveresult.status = self.archiveresult.StatusChoices.FAILED if error else self.archiveresult.StatusChoices.SUCCEEDED
# self.archiveresult.retry_at = None
# self.archiveresult.end_ts = timezone.now()
# self.archiveresult.output = self.archiveresult.outputs[0].path
# self.archiveresult.save()
# self.archiveresult.write_indexes()

View file

@ -1,15 +1,23 @@
from typing import ClassVar, Type, Iterable
from datetime import datetime, timedelta
__package__ = 'archivebox.workers'
import uuid
import json
from typing import ClassVar, Type, Iterable, TypedDict
from datetime import datetime, timedelta
from statemachine.mixins import MachineMixin
from django.db import models
from django.db.models import QuerySet
from django.core import checks
from django.utils import timezone
from django.utils.functional import classproperty
from base_models.models import ABIDModel, ABIDField
from machine.models import Process
from statemachine import registry, StateMachine, State
from django.core import checks
class DefaultStatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
@ -298,3 +306,235 @@ class ModelWithStateMachine(BaseModelWithStateMachine):
class Meta:
abstract = True
class EventDict(TypedDict, total=False):
name: str
id: str | uuid.UUID
path: str
content: str
status: str
retry_at: datetime | None
url: str
seed_id: str | uuid.UUID
crawl_id: str | uuid.UUID
snapshot_id: str | uuid.UUID
process_id: str | uuid.UUID
extractor: str
error: str
on_success: dict | None
on_failure: dict | None
class EventManager(models.Manager):
pass
class EventQuerySet(models.QuerySet):
def get_next_unclaimed(self) -> 'Event | None':
return self.filter(claimed_at=None).order_by('deliver_at').first()
def expired(self, older_than: int=60 * 10) -> QuerySet['Event']:
return self.filter(claimed_at__lt=timezone.now() - timedelta(seconds=older_than))
class Event(ABIDModel):
abid_prefix = 'evn_'
abid_ts_src = 'self.deliver_at' # e.g. 'self.created_at'
abid_uri_src = 'self.name' # e.g. 'self.uri' (MUST BE SET)
abid_subtype_src = 'self.emitted_by' # e.g. 'self.extractor'
abid_rand_src = 'self.id' # e.g. 'self.uuid' or 'self.id'
abid_drift_allowed: bool = False # set to True to allow abid_field values to change after a fixed ABID has been issued (NOT RECOMMENDED: means values can drift out of sync from original ABID)
read_only_fields = ('id', 'deliver_at', 'name', 'kwargs', 'timeout', 'parent', 'emitted_by', 'on_success', 'on_failure')
id = models.UUIDField(primary_key=True, default=uuid.uuid4, null=False, editable=False, unique=True, verbose_name='ID')
# disable these fields from inherited models, they're not needed / take up too much room
abid = None
created_at = None
created_by = None
created_by_id = None
# immutable fields
deliver_at = models.DateTimeField(default=timezone.now, null=False, editable=False, unique=True, db_index=True)
name = models.CharField(max_length=255, null=False, blank=False, db_index=True)
kwargs = models.JSONField(default=dict)
timeout = models.IntegerField(null=False, default=60)
parent = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL, related_name='child_events')
emitted_by = models.ForeignKey(Process, null=False, on_delete=models.PROTECT, related_name='emitted_events')
on_success = models.JSONField(null=True)
on_failure = models.JSONField(null=True)
# mutable fields
modified_at = models.DateTimeField(auto_now=True)
claimed_proc = models.ForeignKey(Process, null=True, on_delete=models.CASCADE, related_name='claimed_events')
claimed_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)
error = models.TextField(null=True)
objects: EventManager = EventManager.from_queryset(EventQuerySet)()
child_events: models.RelatedManager['Event']
@classmethod
def get_next_timestamp(cls):
"""Get the next monotonically increasing timestamp for the next event.dispatch_at"""
latest_event = cls.objects.order_by('-deliver_at').first()
ts = timezone.now()
if latest_event:
assert ts > latest_event.deliver_at, f'Event.deliver_at is not monotonically increasing: {latest_event.deliver_at} > {ts}'
return ts
@classmethod
def dispatch(cls, name: str | EventDict | None = None, event: EventDict | None = None, **event_init_kwargs) -> 'Event':
"""
Create a new Event and save it to the database.
Can be called as either:
>>> Event.dispatch(name, {**kwargs}, **event_init_kwargs)
# OR
>>> Event.dispatch({name, **kwargs}, **event_init_kwargs)
"""
event_kwargs: EventDict = event or {}
if isinstance(name, dict):
event_kwargs.update(name)
assert isinstance(event_kwargs, dict), 'must be called as Event.dispatch(name, {**kwargs}) or Event.dispatch({name, **kwargs})'
event_name: str = name if (isinstance(name, str) and name) else event_kwargs.pop('name')
new_event = cls(
name=event_name,
kwargs=event_kwargs,
emitted_by=Process.current(),
**event_init_kwargs,
)
new_event.save()
return new_event
def clean(self, *args, **kwargs) -> None:
"""Fill and validate all the event fields"""
# check uuid and deliver_at are set
assert self.id, 'Event.id must be set to a valid v4 UUID'
if not self.deliver_at:
self.deliver_at = self.get_next_timestamp()
assert self.deliver_at and (datetime(2024, 12, 8, 12, 0, 0, tzinfo=timezone.utc) < self.deliver_at < datetime(2100, 12, 31, 23, 59, 0, tzinfo=timezone.utc)), (
f'Event.deliver_at must be set to a valid UTC datetime (got Event.deliver_at = {self.deliver_at})')
# if name is not set but it's found in the kwargs, move it out of the kwargs to the name field
if 'type' in self.kwargs and ((self.name == self.kwargs['type']) or not self.name):
self.name = self.kwargs.pop('type')
if 'name' in self.kwargs and ((self.name == self.kwargs['name']) or not self.name):
self.name = self.kwargs.pop('name')
# check name is set and is a valid identifier
assert isinstance(self.name, str) and len(self.name) > 3, 'Event.name must be set to a non-empty string'
assert self.name.isidentifier(), f'Event.name must be a valid identifier (got Event.name = {self.name})'
assert self.name.isupper(), f'Event.name must be in uppercase (got Event.name = {self.name})'
# check that kwargs keys and values are valid
for key, value in self.kwargs.items():
assert isinstance(key, str), f'Event kwargs keys can only be strings (got Event.kwargs[{key}: {type(key).__name__}])'
assert key not in self._meta.get_fields(), f'Event.kwargs cannot contain "{key}" key (Event.kwargs[{key}] conflicts with with reserved attr Event.{key} = {getattr(self, key)})'
assert json.dumps(value, sort_keys=True), f'Event can only contain JSON serializable values (got Event.kwargs[{key}]: {type(value).__name__} = {value})'
# validate on_success and on_failure are valid event dicts if set
if self.on_success:
assert isinstance(self.on_success, dict) and self.on_success.get('name', '!invalid').isidentifier(), f'Event.on_success must be a valid event dict (got {self.on_success})'
if self.on_failure:
assert isinstance(self.on_failure, dict) and self.on_failure.get('name', '!invalid').isidentifier(), f'Event.on_failure must be a valid event dict (got {self.on_failure})'
# validate mutable fields like claimed_at, claimed_proc, finished_at are set correctly
if self.claimed_at:
assert self.claimed_proc, f'Event.claimed_at and Event.claimed_proc must be set together (only found Event.claimed_at = {self.claimed_at})'
if self.claimed_proc:
assert self.claimed_at, f'Event.claimed_at and Event.claimed_proc must be set together (only found Event.claimed_proc = {self.claimed_proc})'
if self.finished_at:
assert self.claimed_at, f'If Event.finished_at is set, Event.claimed_at and Event.claimed_proc must also be set (Event.claimed_proc = {self.claimed_proc} and Event.claimed_at = {self.claimed_at})'
# validate error is a non-empty string or None
if isinstance(self.error, BaseException):
self.error = f'{type(self.error).__name__}: {self.error}'
if self.error:
assert isinstance(self.error, str) and str(self.error).strip(), f'Event.error must be a non-empty string (got Event.error: {type(self.error).__name__} = {self.error})'
else:
assert self.error is None, f'Event.error must be None or a non-empty string (got Event.error: {type(self.error).__name__} = {self.error})'
def save(self, *args, **kwargs):
self.clean()
return super().save(*args, **kwargs)
def reset(self):
"""Force-update an event to a pending/unclaimed state (without running any of its handlers or callbacks)"""
self.claimed_proc = None
self.claimed_at = None
self.finished_at = None
self.error = None
self.save()
def abort(self):
"""Force-update an event to a completed/failed state (without running any of its handlers or callbacks)"""
self.claimed_proc = Process.current()
self.claimed_at = timezone.now()
self.finished_at = timezone.now()
self.error = 'Aborted'
self.save()
def __repr__(self) -> str:
label = f'[{self.name} {self.kwargs}]'
if self.is_finished:
label += f''
elif self.claimed_proc:
label += f' 🏃'
return label
def __str__(self) -> str:
return repr(self)
@property
def type(self) -> str:
return self.name
@property
def is_queued(self):
return not self.is_claimed and not self.is_finished
@property
def is_claimed(self):
return self.claimed_at is not None
@property
def is_expired(self):
if not self.claimed_at:
return False
elapsed_time = timezone.now() - self.claimed_at
return elapsed_time > timedelta(seconds=self.timeout)
@property
def is_processing(self):
return self.is_claimed and not self.is_finished
@property
def is_finished(self):
return self.finished_at is not None
@property
def is_failed(self):
return self.is_finished and bool(self.error)
@property
def is_succeeded(self):
return self.is_finished and not bool(self.error)
def __getattr__(self, key: str):
"""
Allow access to the event kwargs as attributes e.g.
Event(name='CRAWL_CREATE', kwargs={'some_key': 'some_val'}).some_key -> 'some_val'
"""
return self.kwargs.get(key)