merge queues and actors apps into new workers app

This commit is contained in:
Nick Sweeting 2024-11-18 18:52:48 -08:00
parent e50f8cb3b6
commit e469c5a344
No known key found for this signature in database
37 changed files with 89 additions and 304 deletions

View file

@ -0,0 +1,9 @@
__package__ = 'archivebox.workers'
__order__ = 100
import abx
@abx.hookimpl
def register_admin(admin_site):
from workers.admin import register_admin
register_admin(admin_site)

577
archivebox/workers/actor.py Normal file
View file

@ -0,0 +1,577 @@
__package__ = 'archivebox.workers'
import os
import time
import traceback
from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args
from datetime import timedelta
import multiprocessing
from multiprocessing import Process, cpu_count
import psutil
from rich import print
from statemachine import State, StateMachine
from django import db
from django.db.models import QuerySet, sql, Q
from django.db.models import Model as DjangoModel
from django.utils import timezone
from django.utils.functional import classproperty
# from archivebox.logging_util import TimedProgress
from .models import ModelWithStateMachine
multiprocessing.set_start_method('fork', force=True)
class ActorObjectAlreadyClaimed(Exception):
"""Raised when the Actor tries to claim the next object from the queue but it's already been claimed by another Actor"""
pass
class ActorQueueIsEmpty(Exception):
"""Raised when the Actor tries to get the next object from the queue but it's empty"""
pass
CPU_COUNT = cpu_count()
DEFAULT_MAX_TICK_TIME = 60
DEFAULT_MAX_CONCURRENT_ACTORS = min(max(2, int(CPU_COUNT * 0.6)), 8) # 2 < (60% * num available cpu cores) < 8
limit = lambda n, max: min(n, max)
LaunchKwargs = dict[str, Any]
ObjectState = State | str
ObjectStateList = Iterable[ObjectState]
ModelType = TypeVar('ModelType', bound=ModelWithStateMachine)
class ActorType(Generic[ModelType]):
"""
Base class for all actors. Usage:
class FaviconActor(ActorType[FaviconArchiveResult]):
ACTIVE_STATE: ClassVar[str] = 'started'
@classmethod
def qs(cls) -> QuerySet[FaviconArchiveResult]:
return ArchiveResult.objects.filter(extractor='favicon') # or leave the default: FaviconArchiveResult.objects.all()
"""
### Class attributes (defined on the class at compile-time when ActorType[MyModel] is defined)
Model: Type[ModelType]
StateMachineClass: Type[StateMachine]
ACTIVE_STATE: ClassVar[ObjectState] = 'started'
EVENT_NAME: ClassVar[str] = 'tick' # the event name to trigger on the obj.sm: StateMachine (usually 'tick')
CLAIM_ORDER: ClassVar[tuple[str, ...]] = ('-retry_at',) # the .order(*args) to claim the queue objects in, use ('?',) for random order
CLAIM_FROM_TOP_N: ClassVar[int] = CPU_COUNT * 10 # the number of objects to consider when atomically getting the next object from the queue
CLAIM_ATOMIC: ClassVar[bool] = True # whether to atomically fetch+claim the next object in one query, or fetch and lock it in two queries
MAX_TICK_TIME: ClassVar[int] = DEFAULT_MAX_TICK_TIME # maximum duration in seconds to process a single object
MAX_CONCURRENT_ACTORS: ClassVar[int] = DEFAULT_MAX_CONCURRENT_ACTORS # maximum number of concurrent actors that can be running at once
_SPAWNED_ACTOR_PIDS: ClassVar[list[psutil.Process]] = [] # used to record all the pids of Actors spawned on the class
### Instance attributes (only used within an actor instance inside a spawned actor thread/process)
pid: int = os.getpid()
idle_count: int = 0
launch_kwargs: LaunchKwargs = {}
mode: Literal['thread', 'process'] = 'process'
def __init_subclass__(cls) -> None:
"""
Executed at class definition time (i.e. during import of any file containing class MyActor(ActorType[MyModel]): ...).
Loads the django Model from the Generic[ModelType] TypeVar arg and populates any missing class-level config using it.
"""
if getattr(cls, 'Model', None) is None:
cls.Model = cls._get_model_from_generic_typevar()
cls._populate_missing_classvars_from_model(cls.Model)
def __init__(self, mode: Literal['thread', 'process']|None=None, **launch_kwargs: LaunchKwargs):
"""
Executed right before the Actor is spawned to create a unique Actor instance for that thread/process.
actor_instance.runloop() is then executed from inside the newly spawned thread/process.
"""
self.mode = mode or self.mode
self.launch_kwargs = launch_kwargs or dict(self.launch_kwargs)
### Private Helper Methods: Not desiged to be overridden by subclasses or called by anything outside of this class
@classproperty
def name(cls) -> str:
return cls.__name__ # type: ignore
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
"""-> FaviconActor[pid=1234]"""
label = 'pid' if self.mode == 'process' else 'tid'
return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]'
@staticmethod
def _state_to_str(state: ObjectState) -> str:
"""Convert a statemachine.State, models.TextChoices.choices value, or Enum value to a str"""
return str(state.value) if isinstance(state, State) else str(state)
@staticmethod
def _sql_for_select_top_n_candidates(qs: QuerySet, claim_from_top_n: int=CLAIM_FROM_TOP_N) -> tuple[str, tuple[Any, ...]]:
"""Get the SQL for selecting the top N candidates from the queue (to claim one from)"""
queryset = qs.only('id')[:claim_from_top_n]
select_sql, select_params = compile_sql_select(queryset)
return select_sql, select_params
@staticmethod
def _sql_for_update_claimed_obj(qs: QuerySet, update_kwargs: dict[str, Any]) -> tuple[str, tuple[Any, ...]]:
"""Get the SQL for updating a claimed object to mark it as ACTIVE"""
# qs.update(status='started', retry_at=<now + MAX_TICK_TIME>)
update_sql, update_params = compile_sql_update(qs, update_kwargs=update_kwargs)
# e.g. UPDATE core_archiveresult SET status='%s', retry_at='%s' WHERE status NOT IN ('succeeded', 'failed', 'sealed', 'started') AND retry_at <= '2024-11-04 10:14:33.240903'
return update_sql, update_params
@classmethod
def _get_model_from_generic_typevar(cls) -> Type[ModelType]:
"""Get the django Model from the Generic[ModelType] TypeVar arg (and check that it inherits from django.db.models.Model)"""
# cls.__orig_bases__ is non-standard and may be removed in the future! if this breaks,
# we can just require the inerited class to define the Model as a classvar manually, e.g.:
# class SnapshotActor(ActorType[Snapshot]):
# Model: ClassVar[Type[Snapshot]] = Snapshot
# https://stackoverflow.com/questions/57706180/generict-base-class-how-to-get-type-of-t-from-within-instance
Model = get_args(cls.__orig_bases__[0])[0] # type: ignore
assert issubclass(Model, DjangoModel), f'{cls.__name__}.Model must be a valid django Model'
return cast(Type[ModelType], Model)
@classmethod
def _get_state_machine_instance(cls, obj: ModelType) -> StateMachine:
"""Get the StateMachine instance for the given django Model instance (and check that it is a valid instance of cls.StateMachineClass)"""
obj_statemachine = None
state_machine_attr = getattr(obj, 'state_machine_attr', 'sm')
try:
obj_statemachine = getattr(obj, state_machine_attr)
except Exception:
pass
if not isinstance(obj_statemachine, cls.StateMachineClass):
raise Exception(f'{cls.__name__}: Failed to find a valid StateMachine instance at {type(obj).__name__}.{state_machine_attr}')
return obj_statemachine
@classmethod
def _populate_missing_classvars_from_model(cls, Model: Type[ModelType]):
"""Check that the class variables are set correctly based on the ModelType"""
# check that Model is the same as the Generic[ModelType] parameter in the class definition
cls.Model = getattr(cls, 'Model', None) or Model
if cls.Model != Model:
raise ValueError(f'{cls.__name__}.Model must be set to the same Model as the Generic[ModelType] parameter in the class definition')
# check that Model has a valid StateMachine with the required event defined on it
cls.StateMachineClass = getattr(cls, 'StateMachineClass', None) # type: ignore
assert isinstance(cls.EVENT_NAME, str), f'{cls.__name__}.EVENT_NAME must be a str, got: {type(cls.EVENT_NAME).__name__} instead'
assert hasattr(cls.StateMachineClass, cls.EVENT_NAME), f'StateMachine {cls.StateMachineClass.__name__} must define a {cls.EVENT_NAME} event ({cls.__name__}.EVENT_NAME = {cls.EVENT_NAME})'
# check that Model uses .id as its primary key field
primary_key_field = cls.Model._meta.pk.name
if primary_key_field != 'id':
raise NotImplementedError(f'Actors currently only support models that use .id as their primary key field ({cls.__name__} uses {cls.__name__}.{primary_key_field} as primary key)')
# check that ACTIVE_STATE is defined and that it exists on the StateMachineClass
if not getattr(cls, 'ACTIVE_STATE', None):
raise NotImplementedError(f'{cls.__name__} must define an ACTIVE_STATE: ClassVar[State] (e.g. SnapshotMachine.started) ({cls.Model.__name__}.{cls.Model.state_field_name} gets set to this value to mark objects as actively processing)')
assert isinstance(cls.ACTIVE_STATE, (State, str)) and hasattr(cls.StateMachineClass, cls._state_to_str(cls.ACTIVE_STATE)), f'{cls.__name__}.ACTIVE_STATE must be a statemachine.State | str that exists on {cls.StateMachineClass.__name__}, got: {type(cls.ACTIVE_STATE).__name__} instead'
# check the other ClassVar attributes for valid values
assert cls.CLAIM_ORDER and isinstance(cls.CLAIM_ORDER, tuple) and all(isinstance(order, str) for order in cls.CLAIM_ORDER), f'{cls.__name__}.CLAIM_ORDER must be a non-empty tuple[str, ...], got: {type(cls.CLAIM_ORDER).__name__} instead'
assert cls.CLAIM_FROM_TOP_N > 0, f'{cls.__name__}.CLAIM_FROM_TOP_N must be a positive int, got: {cls.CLAIM_FROM_TOP_N} instead'
assert cls.MAX_TICK_TIME >= 1, f'{cls.__name__}.MAX_TICK_TIME must be a positive int > 1, got: {cls.MAX_TICK_TIME} instead'
assert cls.MAX_CONCURRENT_ACTORS >= 1, f'{cls.__name__}.MAX_CONCURRENT_ACTORS must be a positive int >=1, got: {cls.MAX_CONCURRENT_ACTORS} instead'
assert isinstance(cls.CLAIM_ATOMIC, bool), f'{cls.__name__}.CLAIM_ATOMIC must be a bool, got: {cls.CLAIM_ATOMIC} instead'
# @classmethod
# def _fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int:
# """Spawn a new background thread running the actor's runloop"""
# actor = cls(mode='thread', **launch_kwargs)
# bg_actor_thread = Thread(target=actor.runloop)
# bg_actor_thread.start()
# assert bg_actor_thread.native_id is not None
# return bg_actor_thread.native_id
@classmethod
def _fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int:
"""Spawn a new background process running the actor's runloop"""
actor = cls(mode='process', **launch_kwargs)
bg_actor_process = Process(target=actor.runloop)
bg_actor_process.start()
assert bg_actor_process.pid is not None
cls._SPAWNED_ACTOR_PIDS.append(psutil.Process(pid=bg_actor_process.pid))
return bg_actor_process.pid
### Class Methods: Called by Orchestrator on ActorType class before it has been spawned
@classmethod
def get_running_actors(cls) -> list[int]:
"""returns a list of pids of all running actors of this type"""
# WARNING: only works for process actors, not thread actors
if cls.mode == 'thread':
raise NotImplementedError('get_running_actors() is not implemented for thread actors')
return [
proc.pid for proc in cls._SPAWNED_ACTOR_PIDS
if proc.is_running() and proc.status() != 'zombie'
]
@classmethod
def get_actors_to_spawn(cls, queue: QuerySet, running_actors: list[int]) -> list[LaunchKwargs]:
"""Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors"""
queue_length = queue.count()
if not queue_length: # queue is empty, spawn 0 actors
return []
# WARNING:
# spawning new actors processes is slow/expensive, avoid spawning many actors at once in a single orchestrator tick.
# limit to spawning 1 or 2 at a time per orchestrator tick, and let the next tick handle starting another couple.
# DONT DO THIS:
# if queue_length > 20: # queue is extremely long, spawn maximum actors at once!
# num_to_spawn_this_tick = cls.MAX_CONCURRENT_ACTORS
if queue_length > 10:
num_to_spawn_this_tick = 2 # spawn more actors per tick if queue is long
else:
num_to_spawn_this_tick = 1 # spawn fewer actors per tick if queue is short
num_remaining = cls.MAX_CONCURRENT_ACTORS - len(running_actors)
num_to_spawn_now: int = limit(num_to_spawn_this_tick, num_remaining)
actors_launch_kwargs: list[LaunchKwargs] = num_to_spawn_now * [{**cls.launch_kwargs}]
return actors_launch_kwargs
@classmethod
def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int:
if mode == 'thread':
raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything')
# return cls._fork_actor_as_thread(**launch_kwargs)
elif mode == 'process':
return cls._fork_actor_as_process(**launch_kwargs)
raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"')
@classproperty
def qs(cls) -> QuerySet[ModelType]:
"""
Get the unfiltered and unsorted QuerySet of all objects that this Actor might care about.
Override this in the subclass to define the QuerySet of objects that the Actor is going to poll for new work.
(don't limit, order, or filter this by retry_at or status yet, Actor.get_queue() handles that part)
"""
return cls.Model.objects.filter()
@classproperty
def final_q(cls) -> Q:
"""Get the filter for objects that are already completed / in a final state"""
return Q(**{
f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states],
}) # status__in=('sealed', 'failed', 'succeeded')
@classproperty
def active_q(cls) -> Q:
"""Get the filter for objects that are marked active (and are still running / not timed out)"""
return Q(retry_at__gte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started')
@classproperty
def stalled_q(cls) -> Q:
"""Get the filter for objects that are marked active but are timed out"""
return Q(retry_at__lte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') AND Q(<retry_at is in the past>)
@classproperty
def future_q(cls) -> Q:
"""Get the filter for objects that have a retry_at in the future"""
return Q(retry_at__gt=timezone.now(), **{cls.Model.state_field_name: 'QUEUED'})
@classproperty
def pending_q(cls) -> Q:
"""Get the filter for objects that are ready for processing."""
return ~Q(**{
f'{cls.Model.state_field_name}__in': (*[cls._state_to_str(s) for s in cls.StateMachineClass.final_states], cls._state_to_str(cls.ACTIVE_STATE))
}) # status__not_in=('sealed', 'failed', 'succeeded', 'started')
@classmethod
def get_queue(cls, sort: bool=True) -> QuerySet[ModelType]:
"""
Get the sorted and filtered QuerySet of objects that are ready for processing.
e.g. qs.exclude(status__in=('sealed', 'started'), retry_at__gt=timezone.now()).order_by('retry_at')
"""
unsorted_qs = cls.qs.filter(cls.pending_q) | cls.qs.filter(cls.stalled_q)
return unsorted_qs.order_by(*cls.CLAIM_ORDER) if sort else unsorted_qs
### Instance Methods: Only called from within Actor instance after it has been spawned (i.e. forked as a thread or process)
def runloop(self):
"""The main runloop that starts running when the actor is spawned (as subprocess or thread) and exits when the queue is empty"""
self.on_startup()
obj_to_process: ModelType | None = None
last_error: BaseException | None = None
try:
while True:
# Get the next object to process from the queue
try:
obj_to_process = cast(ModelType, self.get_next(atomic=self.CLAIM_ATOMIC))
except (ActorQueueIsEmpty, ActorObjectAlreadyClaimed) as err:
last_error = err
obj_to_process = None
# Handle the case where there is no next object to process
if obj_to_process:
self.idle_count = 0 # reset idle count if we got an object
else:
if self.idle_count >= 3:
break # stop looping and exit if queue is empty and we have idled for 30sec
else:
print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...')
self.idle_count += 1
time.sleep(1)
continue
# Process the object by triggering its StateMachine.tick() method
self.on_tick_start(obj_to_process)
try:
self.tick(obj_to_process)
except Exception as err:
last_error = err
print(f'[red]🏃‍♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]')
db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions
self.on_tick_exception(obj_to_process, err)
traceback.print_exc()
finally:
self.on_tick_end(obj_to_process)
except BaseException as err:
last_error = err
if isinstance(err, KeyboardInterrupt):
print()
else:
print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red] {type(err).__name__}: {err}')
print(f' Last processed object: {obj_to_process}')
raise
finally:
self.on_shutdown(last_obj=obj_to_process, last_error=last_error)
@classmethod
def get_update_kwargs_to_claim_obj(cls) -> dict[str, Any]:
"""
Get the field values needed to mark an pending obj_to_process as being actively processing (aka claimed)
by the current Actor. returned kwargs will be applied using: qs.filter(id=obj_to_process.id).update(**kwargs).
F() expressions are allowed in field values if you need to update a field based on its current value.
Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars.
"""
return {
# cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE), # do this manually in the state machine enter hooks
'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME),
}
def get_next(self, atomic: bool | None=None) -> ModelType | None:
"""get the next object from the queue, atomically locking it if self.CLAIM_ATOMIC=True"""
atomic = self.CLAIM_ATOMIC if atomic is None else atomic
if atomic:
# fetch and claim the next object from in the queue in one go atomically
obj = self.get_next_atomic()
else:
# two-step claim: fetch the next object and lock it in a separate query
obj = self.get_next_non_atomic()
return obj
def get_next_non_atomic(self) -> ModelType:
"""
Naiively selects the top/first object from self.get_queue().order_by(*self.CLAIM_ORDER),
then claims it by running .update(status='started', retry_at=<now + MAX_TICK_TIME>).
Do not use this method if there is more than one Actor racing to get objects from the same queue,
it will be slow/buggy as they'll compete to lock the same object at the same time (TOCTTOU race).
"""
obj = self.get_queue().first()
if obj is None:
raise ActorQueueIsEmpty(f'No next object available in {self}.get_queue()')
locked = self.get_queue().filter(id=obj.id).update(**self.get_update_kwargs_to_claim_obj())
if not locked:
raise ActorObjectAlreadyClaimed(f'Unable to lock the next {self.Model.__name__} object from {self}.get_queue().first()')
return obj
def get_next_atomic(self) -> ModelType | None:
"""
Selects the top n=50 objects from the queue and atomically claims a random one from that set.
This approach safely minimizes contention with other Actors trying to select from the same Queue.
The atomic query is roughly equivalent to the following: (all done in one SQL query to avoid a TOCTTOU race)
top_candidates are selected from: qs.order_by(*CLAIM_ORDER).only('id')[:CLAIM_FROM_TOP_N]
a single candidate is chosen using: qs.filter(id__in=top_n_candidates).order_by('?').first()
the chosen obj is claimed using: qs.filter(id=chosen_obj).update(status=ACTIVE_STATE, retry_at=<now + MAX_TICK_TIME>)
"""
# TODO: if we switch from SQLite to PostgreSQL in the future, we should change this
# to use SELECT FOR UPDATE instead of a subquery + ORDER BY RANDOM() LIMIT 1
# e.g. SELECT id FROM core_archiveresult WHERE status NOT IN (...) AND retry_at <= '...' ORDER BY retry_at ASC LIMIT 50
qs = self.get_queue()
select_top_canidates_sql, select_params = self._sql_for_select_top_n_candidates(qs=qs)
assert select_top_canidates_sql.startswith('SELECT ')
# e.g. UPDATE core_archiveresult SET status='%s', retry_at='%s' WHERE status NOT IN (...) AND retry_at <= '...'
update_claimed_obj_sql, update_params = self._sql_for_update_claimed_obj(qs=self.qs.all(), update_kwargs=self.get_update_kwargs_to_claim_obj())
assert update_claimed_obj_sql.startswith('UPDATE ') and 'WHERE' not in update_claimed_obj_sql
db_table = self.Model._meta.db_table # e.g. core_archiveresult
# subquery gets the pool of the top candidates e.g. self.get_queue().only('id')[:CLAIM_FROM_TOP_N]
# main query selects a random one from that pool, and claims it using .update(status=ACTIVE_STATE, retry_at=<now + MAX_TICK_TIME>)
# this is all done in one atomic SQL query to avoid TOCTTOU race conditions (as much as possible)
atomic_select_and_update_sql = f"""
with top_candidates AS ({select_top_canidates_sql})
{update_claimed_obj_sql}
WHERE "{db_table}"."id" IN (
SELECT id FROM top_candidates
ORDER BY RANDOM()
LIMIT 1
)
RETURNING *;
"""
# import ipdb; ipdb.set_trace()
try:
updated = qs.raw(atomic_select_and_update_sql, (*select_params, *update_params))
assert len(updated) <= 1, f'Expected to claim at most 1 object, but Django modified {len(updated)} objects!'
return updated[0]
except IndexError:
if self.get_queue().exists():
raise ActorObjectAlreadyClaimed(f'Unable to lock the next {self.Model.__name__} object from {self}.get_queue().first()')
else:
raise ActorQueueIsEmpty(f'No next object available in {self}.get_queue()')
def tick(self, obj_to_process: ModelType) -> None:
"""Call the object.sm.tick() method to process the object"""
print(f'[blue]🏃‍♂️ {self}.tick()[/blue] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
# get the StateMachine instance from the object
obj_statemachine = self._get_state_machine_instance(obj_to_process)
starting_state = obj_statemachine.current_state
# trigger the event on the StateMachine instance
obj_tick_method = getattr(obj_statemachine, self.EVENT_NAME) # e.g. obj_statemachine.tick()
obj_tick_method()
ending_state = obj_statemachine.current_state
if starting_state != ending_state:
self.on_state_change(obj_to_process, starting_state, ending_state)
# save the object to persist any state changes
obj_to_process.save()
def on_startup(self) -> None:
if self.mode == 'thread':
# self.pid = get_native_id() # thread id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything')
else:
self.pid = os.getpid() # process id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]')
# abx.pm.hook.on_actor_startup(actor=self)
def on_shutdown(self, last_obj: ModelType | None=None, last_error: BaseException | None=None) -> None:
if isinstance(last_error, KeyboardInterrupt) or last_error is None:
last_error_str = '[green](CTRL-C)[/green]'
elif isinstance(last_error, ActorQueueIsEmpty):
last_error_str = '[green](queue empty)[/green]'
elif isinstance(last_error, ActorObjectAlreadyClaimed):
last_error_str = '[green](queue race)[/green]'
else:
last_error_str = f'[red]{type(last_error).__name__}: {last_error}[/red]'
print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53] {last_error_str}')
# abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error)
def on_tick_start(self, obj_to_process: ModelType) -> None:
print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
# abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj)
# self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ')
pass
def on_tick_end(self, obj_to_process: ModelType) -> None:
print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
# abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process)
# self.timer.end()
pass
# import ipdb; ipdb.set_trace()
def on_tick_exception(self, obj_to_process: ModelType, error: Exception) -> None:
print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}: [red]{type(error).__name__}: {error}[/red]')
# abx.pm.hook.on_actor_tick_exception(actor=self, obj_to_process=obj_to_process, error=error)
def on_state_change(self, obj_to_process: ModelType, starting_state, ending_state) -> None:
print(f'🏃‍♂️ {self}.on_state_change() {obj_to_process.ABID} {starting_state} ➡️ {ending_state}')
# abx.pm.hook.on_actor_state_change(actor=self, obj_to_process=obj_to_process, starting_state=starting_state, ending_state=ending_state)
def compile_sql_select(queryset: QuerySet, filter_kwargs: dict[str, Any] | None=None, order_args: tuple[str, ...]=(), limit: int | None=None) -> tuple[str, tuple[Any, ...]]:
"""
Compute the SELECT query SQL for a queryset.filter(**filter_kwargs).order_by(*order_args)[:limit] call
Returns a tuple of (sql, params) where sql is a template string containing %s (unquoted) placeholders for the params
WARNING:
final_sql = sql % params DOES NOT WORK to assemble the final SQL string because the %s placeholders are not quoted/escaped
they should always passed separately to the DB driver so it can do its own quoting/escaping to avoid SQL injection and syntax errors
"""
assert isinstance(queryset, QuerySet), f'compile_sql_select(...) first argument must be a QuerySet, got: {type(queryset).__name__} instead'
assert filter_kwargs is None or isinstance(filter_kwargs, dict), f'compile_sql_select(...) filter_kwargs argument must be a dict[str, Any], got: {type(filter_kwargs).__name__} instead'
assert isinstance(order_args, tuple) and all(isinstance(arg, str) for arg in order_args), f'compile_sql_select(...) order_args argument must be a tuple[str, ...] got: {type(order_args).__name__} instead'
assert limit is None or isinstance(limit, int), f'compile_sql_select(...) limit argument must be an int, got: {type(limit).__name__} instead'
queryset = queryset._chain() # type: ignore # copy queryset to avoid modifying the original
if filter_kwargs:
queryset = queryset.filter(**filter_kwargs)
if order_args:
queryset = queryset.order_by(*order_args)
if limit is not None:
queryset = queryset[:limit]
query = queryset.query
# e.g. SELECT id FROM core_archiveresult WHERE status NOT IN (%s, %s, %s) AND retry_at <= %s ORDER BY retry_at ASC LIMIT 50
select_sql, select_params = query.get_compiler(queryset.db).as_sql()
return select_sql, select_params
def compile_sql_update(queryset: QuerySet, update_kwargs: dict[str, Any]) -> tuple[str, tuple[Any, ...]]:
"""
Compute the UPDATE query SQL for a queryset.filter(**filter_kwargs).update(**update_kwargs) call
Returns a tuple of (sql, params) where sql is a template string containing %s (unquoted) placeholders for the params
Based on the django.db.models.QuerySet.update() source code, but modified to return the SQL instead of executing the update
https://github.com/django/django/blob/611bf6c2e2a1b4ab93273980c45150c099ab146d/django/db/models/query.py#L1217
WARNING:
final_sql = sql % params DOES NOT WORK to assemble the final SQL string because the %s placeholders are not quoted/escaped
they should always passed separately to the DB driver so it can do its own quoting/escaping to avoid SQL injection and syntax errors
"""
assert isinstance(queryset, QuerySet), f'compile_sql_update(...) first argument must be a QuerySet, got: {type(queryset).__name__} instead'
assert isinstance(update_kwargs, dict), f'compile_sql_update(...) update_kwargs argument must be a dict[str, Any], got: {type(update_kwargs).__name__} instead'
queryset = queryset._chain().all() # type: ignore # copy queryset to avoid modifying the original and clear any filters
queryset.query.clear_ordering(force=True) # clear any ORDER BY clauses
queryset.query.clear_limits() # clear any LIMIT clauses aka slices[:n]
queryset._for_write = True # type: ignore
query = queryset.query.chain(sql.UpdateQuery) # type: ignore
query.add_update_values(update_kwargs) # type: ignore
query.annotations = {} # clear any annotations
# e.g. UPDATE core_archiveresult SET status='%s', retry_at='%s' WHERE status NOT IN (%s, %s, %s) AND retry_at <= %s
update_sql, update_params = query.get_compiler(queryset.db).as_sql()
# make sure you only pass a raw queryset with no .filter(...) clauses applied to it, the return value is designed to used
# in a manually assembled SQL query with its own WHERE clause later on
assert 'WHERE' not in update_sql, f'compile_sql_update(...) should only contain a SET statement but it tried to return a query with a WHERE clause: {update_sql}'
# print(update_sql, update_params)
return update_sql, update_params

View file

@ -0,0 +1,26 @@
__package__ = 'archivebox.workers'
import abx
from django.contrib.auth import get_permission_codename
from huey_monitor.apps import HueyMonitorConfig
from huey_monitor.admin import TaskModel, TaskModelAdmin, SignalInfoModel, SignalInfoModelAdmin
HueyMonitorConfig.verbose_name = 'Background Workers'
class CustomTaskModelAdmin(TaskModelAdmin):
actions = ["delete_selected"]
def has_delete_permission(self, request, obj=None):
codename = get_permission_codename("delete", self.opts)
return request.user.has_perm("%s.%s" % (self.opts.app_label, codename))
@abx.hookimpl
def register_admin(admin_site):
admin_site.register(TaskModel, CustomTaskModelAdmin)
admin_site.register(SignalInfoModel, SignalInfoModelAdmin)

View file

@ -0,0 +1,7 @@
from django.apps import AppConfig
class WorkersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'workers'

View file

@ -0,0 +1,18 @@
from django.core.management.base import BaseCommand
from workers.orchestrator import ArchivingOrchestrator
class Command(BaseCommand):
help = 'Run the archivebox orchestrator'
# def add_arguments(self, parser):
# parser.add_argument('subcommand', type=str, help='The subcommand you want to run')
# parser.add_argument('command_args', nargs='*', help='Arguments to pass to the subcommand')
def handle(self, *args, **kwargs):
orchestrator = ArchivingOrchestrator()
orchestrator.start()

View file

@ -0,0 +1,300 @@
from typing import ClassVar, Type, Iterable
from datetime import datetime, timedelta
from statemachine.mixins import MachineMixin
from django.db import models
from django.utils import timezone
from django.utils.functional import classproperty
from statemachine import registry, StateMachine, State
from django.core import checks
class DefaultStatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
SEALED = 'sealed', 'Sealed'
default_status_field: models.CharField = models.CharField(choices=DefaultStatusChoices.choices, max_length=15, default=DefaultStatusChoices.QUEUED, null=False, blank=False, db_index=True)
default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True)
ObjectState = State | str
ObjectStateList = Iterable[ObjectState]
class BaseModelWithStateMachine(models.Model, MachineMixin):
id: models.UUIDField
StatusChoices: ClassVar[Type[models.TextChoices]]
# status: models.CharField
# retry_at: models.DateTimeField
state_machine_name: ClassVar[str]
state_field_name: ClassVar[str]
state_machine_attr: ClassVar[str] = 'sm'
bind_events_as_methods: ClassVar[bool] = True
active_state: ClassVar[ObjectState]
retry_at_field_name: ClassVar[str]
class Meta:
abstract = True
@classmethod
def check(cls, sender=None, **kwargs):
errors = super().check(**kwargs)
found_id_field = False
found_status_field = False
found_retry_at_field = False
for field in cls._meta.get_fields():
if getattr(field, '_is_state_field', False):
if cls.state_field_name == field.name:
found_status_field = True
if getattr(field, 'choices', None) != cls.StatusChoices.choices:
errors.append(checks.Error(
f'{cls.__name__}.{field.name} must have choices set to {cls.__name__}.StatusChoices.choices',
hint=f'{cls.__name__}.{field.name}.choices = {getattr(field, "choices", None)!r}',
obj=cls,
id='workers.E011',
))
if getattr(field, '_is_retry_at_field', False):
if cls.retry_at_field_name == field.name:
found_retry_at_field = True
if field.name == 'id' and getattr(field, 'primary_key', False):
found_id_field = True
if not found_status_field:
errors.append(checks.Error(
f'{cls.__name__}.state_field_name must be defined and point to a StatusField()',
hint=f'{cls.__name__}.state_field_name = {cls.state_field_name!r} but {cls.__name__}.{cls.state_field_name!r} was not found or does not refer to StatusField',
obj=cls,
id='workers.E012',
))
if not found_retry_at_field:
errors.append(checks.Error(
f'{cls.__name__}.retry_at_field_name must be defined and point to a RetryAtField()',
hint=f'{cls.__name__}.retry_at_field_name = {cls.retry_at_field_name!r} but {cls.__name__}.{cls.retry_at_field_name!r} was not found or does not refer to RetryAtField',
obj=cls,
id='workers.E013',
))
if not found_id_field:
errors.append(checks.Error(
f'{cls.__name__} must have an id field that is a primary key',
hint=f'{cls.__name__}.id = {cls.id!r}',
obj=cls,
id='workers.E014',
))
if not isinstance(cls.state_machine_name, str):
errors.append(checks.Error(
f'{cls.__name__}.state_machine_name must be a dotted-import path to a StateMachine class',
hint=f'{cls.__name__}.state_machine_name = {cls.state_machine_name!r}',
obj=cls,
id='workers.E015',
))
try:
cls.StateMachineClass
except Exception as err:
errors.append(checks.Error(
f'{cls.__name__}.state_machine_name must point to a valid StateMachine class, but got {type(err).__name__} {err} when trying to access {cls.__name__}.StateMachineClass',
hint=f'{cls.__name__}.state_machine_name = {cls.state_machine_name!r}',
obj=cls,
id='workers.E016',
))
if cls.INITIAL_STATE not in cls.StatusChoices.values:
errors.append(checks.Error(
f'{cls.__name__}.StateMachineClass.initial_state must be present within {cls.__name__}.StatusChoices',
hint=f'{cls.__name__}.StateMachineClass.initial_state = {cls.StateMachineClass.initial_state!r}',
obj=cls,
id='workers.E017',
))
if cls.ACTIVE_STATE not in cls.StatusChoices.values:
errors.append(checks.Error(
f'{cls.__name__}.active_state must be set to a valid State present within {cls.__name__}.StatusChoices',
hint=f'{cls.__name__}.active_state = {cls.active_state!r}',
obj=cls,
id='workers.E018',
))
for state in cls.FINAL_STATES:
if state not in cls.StatusChoices.values:
errors.append(checks.Error(
f'{cls.__name__}.StateMachineClass.final_states must all be present within {cls.__name__}.StatusChoices',
hint=f'{cls.__name__}.StateMachineClass.final_states = {cls.StateMachineClass.final_states!r}',
obj=cls,
id='workers.E019',
))
break
return errors
@staticmethod
def _state_to_str(state: ObjectState) -> str:
"""Convert a statemachine.State, models.TextChoices.choices value, or Enum value to a str"""
return str(state.value) if isinstance(state, State) else str(state)
@property
def RETRY_AT(self) -> datetime:
return getattr(self, self.retry_at_field_name)
@RETRY_AT.setter
def RETRY_AT(self, value: datetime):
setattr(self, self.retry_at_field_name, value)
@property
def STATE(self) -> str:
return getattr(self, self.state_field_name)
@STATE.setter
def STATE(self, value: str):
setattr(self, self.state_field_name, value)
def bump_retry_at(self, seconds: int = 10):
self.RETRY_AT = timezone.now() + timedelta(seconds=seconds)
@classproperty
def ACTIVE_STATE(cls) -> str:
return cls._state_to_str(cls.active_state)
@classproperty
def INITIAL_STATE(cls) -> str:
return cls._state_to_str(cls.StateMachineClass.initial_state)
@classproperty
def FINAL_STATES(cls) -> list[str]:
return [cls._state_to_str(state) for state in cls.StateMachineClass.final_states]
@classproperty
def FINAL_OR_ACTIVE_STATES(cls) -> list[str]:
return [*cls.FINAL_STATES, cls.ACTIVE_STATE]
@classmethod
def extend_choices(cls, base_choices: Type[models.TextChoices]):
"""
Decorator to extend the base choices with extra choices, e.g.:
class MyModel(ModelWithStateMachine):
@ModelWithStateMachine.extend_choices(ModelWithStateMachine.StatusChoices)
class StatusChoices(models.TextChoices):
SUCCEEDED = 'succeeded'
FAILED = 'failed'
SKIPPED = 'skipped'
"""
assert issubclass(base_choices, models.TextChoices), f'@extend_choices(base_choices) must be a TextChoices class, not {base_choices.__name__}'
def wrapper(extra_choices: Type[models.TextChoices]) -> Type[models.TextChoices]:
joined = {}
for item in base_choices.choices:
joined[item[0]] = item[1]
for item in extra_choices.choices:
joined[item[0]] = item[1]
return models.TextChoices('StatusChoices', joined)
return wrapper
@classmethod
def StatusField(cls, **kwargs) -> models.CharField:
"""
Used on subclasses to extend/modify the status field with updated kwargs. e.g.:
class MyModel(ModelWithStateMachine):
class StatusChoices(ModelWithStateMachine.StatusChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
SEALED = 'sealed', 'Sealed'
BACKOFF = 'backoff', 'Backoff'
FAILED = 'failed', 'Failed'
SKIPPED = 'skipped', 'Skipped'
status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
"""
default_kwargs = default_status_field.deconstruct()[3]
updated_kwargs = {**default_kwargs, **kwargs}
field = models.CharField(**updated_kwargs)
field._is_state_field = True # type: ignore
return field
@classmethod
def RetryAtField(cls, **kwargs) -> models.DateTimeField:
"""
Used on subclasses to extend/modify the retry_at field with updated kwargs. e.g.:
class MyModel(ModelWithStateMachine):
retry_at = ModelWithStateMachine.RetryAtField(editable=False)
"""
default_kwargs = default_retry_at_field.deconstruct()[3]
updated_kwargs = {**default_kwargs, **kwargs}
field = models.DateTimeField(**updated_kwargs)
field._is_retry_at_field = True # type: ignore
return field
@classproperty
def StateMachineClass(cls) -> Type[StateMachine]:
"""Get the StateMachine class for the given django Model that inherits from MachineMixin"""
model_state_machine_name = getattr(cls, 'state_machine_name', None)
if model_state_machine_name:
StateMachineCls = registry.get_machine_cls(model_state_machine_name)
assert issubclass(StateMachineCls, StateMachine)
return StateMachineCls
raise NotImplementedError(f'ActorType[{cls.__name__}] must define .state_machine_name: str that points to a valid StateMachine')
# @classproperty
# def final_q(cls) -> Q:
# """Get the filter for objects that are in a final state"""
# return Q(**{f'{cls.state_field_name}__in': cls.final_states})
# @classproperty
# def active_q(cls) -> Q:
# """Get the filter for objects that are actively processing right now"""
# return Q(**{cls.state_field_name: cls._state_to_str(cls.active_state)}) # e.g. Q(status='started')
# @classproperty
# def stalled_q(cls) -> Q:
# """Get the filter for objects that are marked active but have timed out"""
# return cls.active_q & Q(retry_at__lte=timezone.now()) # e.g. Q(status='started') AND Q(<retry_at is in the past>)
# @classproperty
# def future_q(cls) -> Q:
# """Get the filter for objects that have a retry_at in the future"""
# return Q(retry_at__gt=timezone.now())
# @classproperty
# def pending_q(cls) -> Q:
# """Get the filter for objects that are ready for processing."""
# return ~(cls.active_q) & ~(cls.final_q) & ~(cls.future_q)
# @classmethod
# def get_queue(cls) -> QuerySet:
# """
# Get the sorted and filtered QuerySet of objects that are ready for processing.
# e.g. qs.exclude(status__in=('sealed', 'started'), retry_at__gt=timezone.now()).order_by('retry_at')
# """
# return cls.objects.filter(cls.pending_q)
class ModelWithStateMachine(BaseModelWithStateMachine):
StatusChoices: ClassVar[Type[DefaultStatusChoices]] = DefaultStatusChoices
status: models.CharField = BaseModelWithStateMachine.StatusField()
retry_at: models.DateTimeField = BaseModelWithStateMachine.RetryAtField()
state_machine_name: ClassVar[str] # e.g. 'core.statemachines.ArchiveResultMachine'
state_field_name: ClassVar[str] = 'status'
state_machine_attr: ClassVar[str] = 'sm'
bind_events_as_methods: ClassVar[bool] = True
active_state: ClassVar[str] = StatusChoices.STARTED
retry_at_field_name: ClassVar[str] = 'retry_at'
class Meta:
abstract = True

View file

@ -0,0 +1,203 @@
__package__ = 'archivebox.workers'
import os
import time
import itertools
from typing import Dict, Type, Literal, TYPE_CHECKING
from django.utils.functional import classproperty
from django.utils import timezone
import multiprocessing
from rich import print
# from django.db.models import QuerySet
from django.apps import apps
if TYPE_CHECKING:
from .actor import ActorType
multiprocessing.set_start_method('fork', force=True)
class Orchestrator:
pid: int
idle_count: int = 0
actor_types: Dict[str, Type['ActorType']] = {}
mode: Literal['thread', 'process'] = 'process'
exit_on_idle: bool = True
max_concurrent_actors: int = 20
def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True, max_concurrent_actors: int=max_concurrent_actors):
self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types()
self.mode = mode or self.mode
self.exit_on_idle = exit_on_idle
self.max_concurrent_actors = max_concurrent_actors
def __repr__(self) -> str:
label = 'tid' if self.mode == 'thread' else 'pid'
return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]'
def __str__(self) -> str:
return self.__repr__()
@classproperty
def name(cls) -> str:
return cls.__name__ # type: ignore
# def _fork_as_thread(self):
# self.thread = Thread(target=self.runloop)
# self.thread.start()
# assert self.thread.native_id is not None
# return self.thread.native_id
def _fork_as_process(self):
self.process = multiprocessing.Process(target=self.runloop)
self.process.start()
assert self.process.pid is not None
return self.process.pid
def start(self) -> int:
if self.mode == 'thread':
# return self._fork_as_thread()
raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
elif self.mode == 'process':
return self._fork_as_process()
raise ValueError(f'Invalid orchestrator mode: {self.mode}')
@classmethod
def autodiscover_actor_types(cls) -> Dict[str, Type['ActorType']]:
from archivebox.config.django import setup_django
setup_django()
# returns a Dict of all discovered {actor_type_id: ActorType} across the codebase
# override this method in a subclass to customize the actor types that are used
# return {'Snapshot': SnapshotWorker, 'ArchiveResult_chrome': ChromeActorType, ...}
from crawls.statemachines import CrawlWorker
from core.statemachines import SnapshotWorker, ArchiveResultWorker
return {
'CrawlWorker': CrawlWorker,
'SnapshotWorker': SnapshotWorker,
'ArchiveResultWorker': ArchiveResultWorker,
# look through all models and find all classes that inherit from ActorType
# actor_type.__name__: actor_type
# for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values()
}
@classmethod
def get_orphaned_objects(cls, all_queues) -> list:
# returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types
all_queued_ids = itertools.chain(*[queue.values('id', flat=True) for queue in all_queues.values()])
orphaned_objects = []
for model in apps.get_models():
if hasattr(model, 'retry_at'):
orphaned_objects.extend(model.objects.filter(retry_at__lt=timezone.now()).exclude(id__in=all_queued_ids))
return orphaned_objects
@classmethod
def has_future_objects(cls, all_queues) -> bool:
# returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types
return any(
queue.filter(retry_at__gte=timezone.now()).exists()
for queue in all_queues.values()
)
def on_startup(self):
if self.mode == 'thread':
# self.pid = get_native_id()
print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]')
raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
elif self.mode == 'process':
self.pid = os.getpid()
print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]')
# abx.pm.hook.on_orchestrator_startup(self)
def on_shutdown(self, err: BaseException | None = None):
print(f'[grey53]👨‍✈️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]')
# abx.pm.hook.on_orchestrator_shutdown(self)
def on_tick_started(self, all_queues):
total_pending = sum(queue.count() for queue in all_queues.values())
if total_pending:
print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}')
# abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues)
pass
def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors):
# if all_spawned_actors:
# total_queue_length = sum(queue.count() for queue in all_queues.values())
# print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]')
# abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues)
pass
def on_idle(self, all_queues):
print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}')
# abx.pm.hook.on_orchestrator_idle(self)
# check for orphaned objects left behind
if self.idle_count == 60:
orphaned_objects = self.get_orphaned_objects(all_queues)
if orphaned_objects:
print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 30s:[/red]', orphaned_objects)
if self.idle_count > 3 and self.exit_on_idle and not self.has_future_objects(all_queues):
raise KeyboardInterrupt('✅ All tasks completed, exiting')
def runloop(self):
from archivebox.config.django import setup_django
setup_django()
self.on_startup()
try:
while True:
all_queues = {
actor_type: actor_type.get_queue()
for actor_type in self.actor_types.values()
}
if not all_queues:
raise Exception('Failed to find any actor_types to process')
self.on_tick_started(all_queues)
all_existing_actors = []
all_spawned_actors = []
for actor_type, queue in all_queues.items():
if not queue.exists():
continue
next_obj = queue.first()
print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}')
self.idle_count = 0
try:
existing_actors = actor_type.get_running_actors()
all_existing_actors.extend(existing_actors)
actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors)
can_spawn_num_remaining = self.max_concurrent_actors - len(all_existing_actors) # set max_concurrent_actors=1 to disable multitasking
for launch_kwargs in actors_to_spawn[:can_spawn_num_remaining]:
new_actor_pid = actor_type.start(mode='process', **launch_kwargs)
all_spawned_actors.append(new_actor_pid)
except Exception as err:
print(f'🏃‍♂️ ERROR: {self} Failed to get {actor_type} queue & running actors', err)
except BaseException:
raise
if not any(queue.exists() for queue in all_queues.values()):
self.on_idle(all_queues)
self.idle_count += 1
time.sleep(0.5)
else:
self.idle_count = 0
self.on_tick_finished(all_queues, all_existing_actors, all_spawned_actors)
time.sleep(1)
except BaseException as err:
if isinstance(err, KeyboardInterrupt):
print()
else:
print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err)
self.on_shutdown(err=err)

View file

@ -0,0 +1,103 @@
import uuid
from functools import wraps
from django.db import connection, transaction
from django.utils import timezone
from huey.exceptions import TaskLockedException
from archivebox.config import CONSTANTS
class SqliteSemaphore:
def __init__(self, db_path, table_name, name, value=1, timeout=None):
self.db_path = db_path
self.table_name = table_name
self.name = name
self.value = value
self.timeout = timeout or 86400 # Set a max age for lock holders
# Ensure the table exists
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id TEXT PRIMARY KEY,
name TEXT,
timestamp DATETIME
)
""")
def acquire(self, name=None):
name = name or str(uuid.uuid4())
now = timezone.now()
expiration = now - timezone.timedelta(seconds=self.timeout)
with transaction.atomic():
# Remove expired locks
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE name = %s AND timestamp < %s
""", [self.name, expiration])
# Try to acquire the lock
with connection.cursor() as cursor:
cursor.execute(f"""
INSERT INTO {self.table_name} (id, name, timestamp)
SELECT %s, %s, %s
WHERE (
SELECT COUNT(*) FROM {self.table_name}
WHERE name = %s
) < %s
""", [name, self.name, now, self.name, self.value])
if cursor.rowcount > 0:
return name
# If we couldn't acquire the lock, remove our attempted entry
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE id = %s AND name = %s
""", [name, self.name])
return None
def release(self, name):
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE id = %s AND name = %s
""", [name, self.name])
return cursor.rowcount > 0
LOCKS_DB_PATH = CONSTANTS.DATABASE_FILE.parent / 'locks.sqlite3'
def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None):
"""
Lock which can be acquired multiple times (default = 1).
NOTE: no provisions are made for blocking, waiting, or notifying. This is
just a lock which can be acquired a configurable number of times.
Example:
# Allow up to 3 workers to run this task concurrently. If the task is
# locked, retry up to 2 times with a delay of 60s.
@huey.task(retries=2, retry_delay=60)
@lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3)
def my_task():
...
"""
sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout)
def decorator(fn):
@wraps(fn)
def inner(*args, **kwargs):
tid = sem.acquire()
if tid is None:
raise TaskLockedException(f'unable to acquire lock {lock_name}')
try:
return fn(*args, **kwargs)
finally:
sem.release(tid)
return inner
return decorator

View file

@ -0,0 +1,475 @@
__package__ = 'archivebox.workers'
import sys
import time
import signal
import psutil
import shutil
import subprocess
from typing import Dict, cast, Iterator
from pathlib import Path
from functools import cache
from rich import print
from supervisor.xmlrpc import SupervisorTransport
from xmlrpc.client import ServerProxy
from archivebox.config import CONSTANTS
from archivebox.config.paths import get_or_create_working_tmp_dir
from archivebox.config.permissions import ARCHIVEBOX_USER
from archivebox.misc.logging import STDERR
from archivebox.logging_util import pretty_path
LOG_FILE_NAME = "supervisord.log"
CONFIG_FILE_NAME = "supervisord.conf"
PID_FILE_NAME = "supervisord.pid"
WORKERS_DIR_NAME = "workers"
SCHEDULER_WORKER = {
"name": "worker_scheduler",
"command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_scheduler.log",
"redirect_stderr": "true",
}
COMMAND_WORKER = {
"name": "worker_commands",
"command": "archivebox manage djangohuey --queue commands -w 4 -k thread --no-periodic --disable-health-check",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_commands.log",
"redirect_stderr": "true",
}
ORCHESTRATOR_WORKER = {
"name": "worker_orchestrator",
"command": "archivebox manage orchestrator",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_orchestrator.log",
"redirect_stderr": "true",
}
SERVER_WORKER = lambda host, port: {
"name": "worker_daphne",
"command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application",
"autostart": "false",
"autorestart": "true",
"stdout_logfile": "logs/worker_daphne.log",
"redirect_stderr": "true",
}
@cache
def get_sock_file():
"""Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits"""
TMP_DIR = get_or_create_working_tmp_dir(autofix=True, quiet=False)
assert TMP_DIR, "Failed to find or create a writable TMP_DIR!"
socket_file = TMP_DIR / "supervisord.sock"
return socket_file
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
`sleep_sec` is the time to sleep after empty reads. """
line = ''
while True:
tmp = file.readline()
if tmp is not None and tmp != "":
line += tmp
if line.endswith("\n"):
yield line
line = ''
elif sleep_sec:
time.sleep(sleep_sec)
def create_supervisord_config():
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
config_content = f"""
[supervisord]
nodaemon = true
environment = IS_SUPERVISORD_PARENT="true"
pidfile = {PID_FILE}
logfile = {LOG_FILE}
childlogdir = {CONSTANTS.LOGS_DIR}
directory = {CONSTANTS.DATA_DIR}
strip_ansi = true
nocleanup = true
user = {ARCHIVEBOX_USER}
[unix_http_server]
file = {SOCK_FILE}
chmod = 0700
[supervisorctl]
serverurl = unix://{SOCK_FILE}
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[include]
files = {WORKERS_DIR}/*.conf
"""
CONFIG_FILE.write_text(config_content)
Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
(WORKERS_DIR / 'initial_startup.conf').write_text('') # hides error about "no files found to include" when supervisord starts
def create_worker_config(daemon):
"""Create a supervisord worker config file for a given daemon"""
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
name = daemon['name']
worker_conf = WORKERS_DIR / f"{name}.conf"
worker_str = f"[program:{name}]\n"
for key, value in daemon.items():
if key == 'name':
continue
worker_str += f"{key}={value}\n"
worker_str += "\n"
worker_conf.write_text(worker_str)
def get_existing_supervisord_process():
SOCK_FILE = get_sock_file()
try:
transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
server = ServerProxy("http://localhost", transport=transport) # user:pass@localhost doesn't work for some reason with unix://.sock, cant seem to silence CRIT no-auth warning
current_state = cast(Dict[str, int | str], server.supervisor.getState())
if current_state["statename"] == "RUNNING":
pid = server.supervisor.getPID()
print(f"[🦸‍♂️] Supervisord connected (pid={pid}) via unix://{pretty_path(SOCK_FILE)}.")
return server.supervisor
except FileNotFoundError:
return None
except Exception as e:
print(f"Error connecting to existing supervisord: {str(e)}")
return None
def stop_existing_supervisord_process():
SOCK_FILE = get_sock_file()
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
try:
# if pid file exists, load PID int
try:
pid = int(PID_FILE.read_text())
except (FileNotFoundError, ValueError):
return
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
proc = psutil.Process(pid)
proc.terminate()
proc.wait(timeout=5)
except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt):
pass
finally:
try:
# clear PID file and socket file
PID_FILE.unlink(missing_ok=True)
get_sock_file().unlink(missing_ok=True)
except BaseException:
pass
def start_new_supervisord_process(daemonize=False):
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
print(f"[🦸‍♂️] Supervisord starting{' in background' if daemonize else ''}...")
pretty_log_path = pretty_path(LOG_FILE)
print(f" > Writing supervisord logs to: {pretty_log_path}")
print(f" > Writing task worker logs to: {pretty_log_path.replace('supervisord.log', 'worker_*.log')}")
print(f' > Using supervisord config file: {pretty_path(CONFIG_FILE)}')
print(f" > Using supervisord UNIX socket: {pretty_path(SOCK_FILE)}")
print()
# clear out existing stale state files
shutil.rmtree(WORKERS_DIR, ignore_errors=True)
PID_FILE.unlink(missing_ok=True)
get_sock_file().unlink(missing_ok=True)
CONFIG_FILE.unlink(missing_ok=True)
# create the supervisord config file
create_supervisord_config()
# Start supervisord
# panel = Panel(f"Starting supervisord with config: {SUPERVISORD_CONFIG_FILE}")
# with Live(panel, refresh_per_second=1) as live:
subprocess.Popen(
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
shell=True,
start_new_session=daemonize,
)
def exit_signal_handler(signum, frame):
if signum == 2:
STDERR.print("\n[🛑] Got Ctrl+C. Terminating child processes...")
elif signum != 13:
STDERR.print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
stop_existing_supervisord_process()
raise SystemExit(0)
# Monitor for termination signals and cleanup child processes
if not daemonize:
try:
signal.signal(signal.SIGINT, exit_signal_handler)
signal.signal(signal.SIGHUP, exit_signal_handler)
signal.signal(signal.SIGPIPE, exit_signal_handler)
signal.signal(signal.SIGTERM, exit_signal_handler)
except Exception:
# signal handlers only work in main thread
pass
# otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode)
time.sleep(2)
return get_existing_supervisord_process()
def get_or_create_supervisord_process(daemonize=False):
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
supervisor = get_existing_supervisord_process()
if supervisor is None:
stop_existing_supervisord_process()
supervisor = start_new_supervisord_process(daemonize=daemonize)
time.sleep(0.5)
# wait up to 5s in case supervisord is slow to start
if not supervisor:
for _ in range(10):
if supervisor is not None:
print()
break
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(0.5)
supervisor = get_existing_supervisord_process()
else:
print()
assert supervisor, "Failed to start supervisord or connect to it!"
supervisor.getPID() # make sure it doesn't throw an exception
(WORKERS_DIR / 'initial_startup.conf').unlink(missing_ok=True)
return supervisor
def start_worker(supervisor, daemon, lazy=False):
assert supervisor.getPID()
print(f"[🦸‍♂️] Supervisord starting new subprocess worker: {daemon['name']}...")
create_worker_config(daemon)
result = supervisor.reloadConfig()
added, changed, removed = result[0]
# print(f"Added: {added}, Changed: {changed}, Removed: {removed}")
for removed in removed:
supervisor.stopProcessGroup(removed)
supervisor.removeProcessGroup(removed)
for changed in changed:
supervisor.stopProcessGroup(changed)
supervisor.removeProcessGroup(changed)
supervisor.addProcessGroup(changed)
for added in added:
supervisor.addProcessGroup(added)
time.sleep(1)
for _ in range(10):
procs = supervisor.getAllProcessInfo()
for proc in procs:
if proc['name'] == daemon["name"]:
# See process state diagram here: http://supervisord.org/subprocess.html
if proc['statename'] == 'RUNNING':
print(f" - Worker {daemon['name']}: already {proc['statename']} ({proc['description']})")
return proc
else:
if not lazy:
supervisor.startProcessGroup(daemon["name"], True)
proc = supervisor.getProcessInfo(daemon["name"])
print(f" - Worker {daemon['name']}: started {proc['statename']} ({proc['description']})")
return proc
# retry in a second in case it's slow to launch
time.sleep(0.5)
raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")
def get_worker(supervisor, daemon_name):
try:
return supervisor.getProcessInfo(daemon_name)
except Exception:
pass
return None
def stop_worker(supervisor, daemon_name):
proc = get_worker(supervisor, daemon_name)
for _ in range(10):
if not proc:
# worker does not exist (was never running or configured in the first place)
return True
# See process state diagram here: http://supervisord.org/subprocess.html
if proc['statename'] == 'STOPPED':
# worker was configured but has already stopped for some reason
supervisor.removeProcessGroup(daemon_name)
return True
else:
# worker was configured and is running, stop it now
supervisor.stopProcessGroup(daemon_name)
# wait 500ms and then re-check to make sure it's really stopped
time.sleep(0.5)
proc = get_worker(supervisor, daemon_name)
raise Exception(f"Failed to stop worker {daemon_name}!")
def tail_worker_logs(log_path: str):
get_or_create_supervisord_process(daemonize=False)
from rich.live import Live
from rich.table import Table
table = Table()
table.add_column("TS")
table.add_column("URL")
try:
with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid
with open(log_path, 'r') as f:
for line in follow(f):
if '://' in line:
live.console.print(f"Working on: {line.strip()}")
# table.add_row("123124234", line.strip())
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
def watch_worker(supervisor, daemon_name, interval=5):
"""loop continuously and monitor worker's health"""
while True:
proc = get_worker(supervisor, daemon_name)
if not proc:
raise Exception("Worker dissapeared while running! " + daemon_name)
if proc['statename'] == 'STOPPED':
return proc
if proc['statename'] == 'RUNNING':
time.sleep(1)
continue
if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'):
print(f'[🦸‍♂️] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}')
time.sleep(interval)
continue
def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
supervisor = get_or_create_supervisord_process(daemonize=daemonize)
bg_workers = [
SCHEDULER_WORKER,
COMMAND_WORKER,
ORCHESTRATOR_WORKER,
]
print()
start_worker(supervisor, SERVER_WORKER(host=host, port=port))
print()
for worker in bg_workers:
start_worker(supervisor, worker)
print()
if not daemonize:
try:
watch_worker(supervisor, "worker_daphne")
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_daphne")
time.sleep(0.5)
def start_cli_workers(watch=False):
supervisor = get_or_create_supervisord_process(daemonize=False)
start_worker(supervisor, COMMAND_WORKER)
start_worker(supervisor, ORCHESTRATOR_WORKER)
if watch:
try:
watch_worker(supervisor, ORCHESTRATOR_WORKER['name'])
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, COMMAND_WORKER['name'])
stop_worker(supervisor, ORCHESTRATOR_WORKER['name'])
time.sleep(0.5)
return [COMMAND_WORKER, ORCHESTRATOR_WORKER]
# def main(daemons):
# supervisor = get_or_create_supervisord_process(daemonize=False)
# worker = start_worker(supervisor, daemons["webworker"])
# pprint(worker)
# print("All processes started in background.")
# Optionally you can block the main thread until an exit signal is received:
# try:
# signal.pause()
# except KeyboardInterrupt:
# pass
# finally:
# stop_existing_supervisord_process()
# if __name__ == "__main__":
# DAEMONS = {
# "webworker": {
# "name": "webworker",
# "command": "python3 -m http.server 9000",
# "directory": str(cwd),
# "autostart": "true",
# "autorestart": "true",
# "stdout_logfile": cwd / "webworker.log",
# "stderr_logfile": cwd / "webworker_error.log",
# },
# }
# main(DAEMONS, cwd)

123
archivebox/workers/tasks.py Normal file
View file

@ -0,0 +1,123 @@
__package__ = 'archivebox.workers'
from functools import wraps
# from django.utils import timezone
from django_huey import db_task, task
from huey_monitor.models import TaskModel
from huey_monitor.tqdm import ProcessInfo
from .supervisor_util import get_or_create_supervisord_process
# @db_task(queue="commands", context=True, schedule=1)
# def scheduler_tick():
# print('SCHEDULER TICK', timezone.now().isoformat())
# # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
# # abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine())
# scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True)
# scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now())
# for scheduled_crawl in scheduled_crawls_due:
# try:
# abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
# except Exception as e:
# abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
# # abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
def db_task_with_parent(func):
"""Decorator for db_task that sets the parent task for the db_task"""
@wraps(func)
def wrapper(*args, **kwargs):
task = kwargs.get('task')
parent_task_id = kwargs.get('parent_task_id')
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
return func(*args, **kwargs)
return wrapper
@db_task(queue="commands", context=True)
def bg_add(add_kwargs, task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=False)
from ..main import add
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert add_kwargs and add_kwargs.get("urls")
rough_url_count = add_kwargs["urls"].count("://")
process_info = ProcessInfo(task, desc="add", parent_task_id=parent_task_id, total=rough_url_count)
result = add(**add_kwargs)
process_info.update(n=rough_url_count)
return result
@task(queue="commands", context=True)
def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=False)
from ..extractors import archive_links
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert args and args[0]
kwargs = kwargs or {}
rough_count = len(args[0])
process_info = ProcessInfo(task, desc="archive_links", parent_task_id=parent_task_id, total=rough_count)
result = archive_links(*args, **kwargs)
process_info.update(n=rough_count)
return result
@task(queue="commands", context=True)
def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=False)
from ..extractors import archive_link
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert args and args[0]
kwargs = kwargs or {}
rough_count = len(args[0])
process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=rough_count)
result = archive_link(*args, **kwargs)
process_info.update(n=rough_count)
return result
@task(queue="commands", context=True)
def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None):
# get_or_create_supervisord_process(daemonize=False)
from ..extractors import archive_link
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=1)
link = snapshot.as_link_with_details()
result = archive_link(link, overwrite=overwrite, methods=methods)
process_info.update(n=1)
return result

View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View file

@ -0,0 +1,20 @@
from django.views.generic import TemplateView
from django.contrib.auth.mixins import UserPassesTestMixin
from django.utils import timezone
from api.auth import get_or_create_api_token
class JobsDashboardView(UserPassesTestMixin, TemplateView):
template_name = "jobs_dashboard.html"
def test_func(self):
return self.request.user and self.request.user.is_superuser
def get_context_data(self, **kwargs):
api_token = get_or_create_api_token(self.request.user)
context = super().get_context_data(**kwargs)
context['api_token'] = api_token.token if api_token else 'UNABLE TO GENERATE API TOKEN'
context['now'] = timezone.now().strftime("%H:%M:%S")
return context