simplify actor and orchestrator by removing threading code, fixing bugs

This commit is contained in:
Nick Sweeting 2024-11-18 02:24:52 -08:00
parent af21c3428b
commit 9b8cf7b4f0
No known key found for this signature in database
2 changed files with 69 additions and 171 deletions

View file

@ -5,13 +5,12 @@ import time
import traceback import traceback
from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args
from datetime import timedelta from datetime import timedelta
import multiprocessing
from multiprocessing import Process, cpu_count from multiprocessing import Process, cpu_count
from threading import Thread, get_native_id
import psutil import psutil
from rich import print from rich import print
from statemachine import State, StateMachine, registry from statemachine import State, StateMachine
from statemachine.mixins import MachineMixin
from django import db from django import db
from django.db.models import QuerySet, sql, Q from django.db.models import QuerySet, sql, Q
@ -19,9 +18,13 @@ from django.db.models import Model as DjangoModel
from django.utils import timezone from django.utils import timezone
from django.utils.functional import classproperty from django.utils.functional import classproperty
# from archivebox.logging_util import TimedProgress
from .models import ModelWithStateMachine from .models import ModelWithStateMachine
# from archivebox.logging_util import TimedProgress
multiprocessing.set_start_method('fork', force=True)
class ActorObjectAlreadyClaimed(Exception): class ActorObjectAlreadyClaimed(Exception):
"""Raised when the Actor tries to claim the next object from the queue but it's already been claimed by another Actor""" """Raised when the Actor tries to claim the next object from the queue but it's already been claimed by another Actor"""
@ -48,7 +51,6 @@ class ActorType(Generic[ModelType]):
Base class for all actors. Usage: Base class for all actors. Usage:
class FaviconActor(ActorType[FaviconArchiveResult]): class FaviconActor(ActorType[FaviconArchiveResult]):
FINAL_STATES: ClassVar[tuple[str, ...]] = ('succeeded', 'failed', 'skipped')
ACTIVE_STATE: ClassVar[str] = 'started' ACTIVE_STATE: ClassVar[str] = 'started'
@classmethod @classmethod
@ -60,9 +62,7 @@ class ActorType(Generic[ModelType]):
Model: Type[ModelType] Model: Type[ModelType]
StateMachineClass: Type[StateMachine] StateMachineClass: Type[StateMachine]
STATE_FIELD_NAME: ClassVar[str] = 'status'
ACTIVE_STATE: ClassVar[ObjectState] = 'started' ACTIVE_STATE: ClassVar[ObjectState] = 'started'
FINAL_STATES: ClassVar[ObjectStateList] # e.g. ['succeeded', 'failed', 'skipped'] or ['sealed']
EVENT_NAME: ClassVar[str] = 'tick' # the event name to trigger on the obj.sm: StateMachine (usually 'tick') EVENT_NAME: ClassVar[str] = 'tick' # the event name to trigger on the obj.sm: StateMachine (usually 'tick')
CLAIM_ORDER: ClassVar[tuple[str, ...]] = ('-retry_at',) # the .order(*args) to claim the queue objects in, use ('?',) for random order CLAIM_ORDER: ClassVar[tuple[str, ...]] = ('-retry_at',) # the .order(*args) to claim the queue objects in, use ('?',) for random order
@ -144,17 +144,7 @@ class ActorType(Generic[ModelType]):
assert issubclass(Model, DjangoModel), f'{cls.__name__}.Model must be a valid django Model' assert issubclass(Model, DjangoModel), f'{cls.__name__}.Model must be a valid django Model'
return cast(Type[ModelType], Model) return cast(Type[ModelType], Model)
@staticmethod
def _get_state_machine_cls(Model: Type[ModelType]) -> Type[StateMachine]:
"""Get the StateMachine class for the given django Model that inherits from MachineMixin"""
assert issubclass(Model, MachineMixin), f'{Model.__name__} must inherit from MachineMixin and define a .state_machine_name: str'
model_state_machine_name = getattr(Model, 'state_machine_name', None)
if model_state_machine_name:
StateMachine = registry.get_machine_cls(model_state_machine_name)
assert issubclass(StateMachine, StateMachine)
return StateMachine
raise NotImplementedError(f'ActorType[{Model.__name__}] must define .state_machine_name: str that points to a valid StateMachine')
@classmethod @classmethod
def _get_state_machine_instance(cls, obj: ModelType) -> StateMachine: def _get_state_machine_instance(cls, obj: ModelType) -> StateMachine:
"""Get the StateMachine instance for the given django Model instance (and check that it is a valid instance of cls.StateMachineClass)""" """Get the StateMachine instance for the given django Model instance (and check that it is a valid instance of cls.StateMachineClass)"""
@ -180,7 +170,7 @@ class ActorType(Generic[ModelType]):
raise ValueError(f'{cls.__name__}.Model must be set to the same Model as the Generic[ModelType] parameter in the class definition') raise ValueError(f'{cls.__name__}.Model must be set to the same Model as the Generic[ModelType] parameter in the class definition')
# check that Model has a valid StateMachine with the required event defined on it # check that Model has a valid StateMachine with the required event defined on it
cls.StateMachineClass = getattr(cls, 'StateMachineClass', None) or cls._get_state_machine_cls(cls.Model) cls.StateMachineClass = getattr(cls, 'StateMachineClass', None) # type: ignore
assert isinstance(cls.EVENT_NAME, str), f'{cls.__name__}.EVENT_NAME must be a str, got: {type(cls.EVENT_NAME).__name__} instead' assert isinstance(cls.EVENT_NAME, str), f'{cls.__name__}.EVENT_NAME must be a str, got: {type(cls.EVENT_NAME).__name__} instead'
assert hasattr(cls.StateMachineClass, cls.EVENT_NAME), f'StateMachine {cls.StateMachineClass.__name__} must define a {cls.EVENT_NAME} event ({cls.__name__}.EVENT_NAME = {cls.EVENT_NAME})' assert hasattr(cls.StateMachineClass, cls.EVENT_NAME), f'StateMachine {cls.StateMachineClass.__name__} must define a {cls.EVENT_NAME} event ({cls.__name__}.EVENT_NAME = {cls.EVENT_NAME})'
@ -189,26 +179,10 @@ class ActorType(Generic[ModelType]):
if primary_key_field != 'id': if primary_key_field != 'id':
raise NotImplementedError(f'Actors currently only support models that use .id as their primary key field ({cls.__name__} uses {cls.__name__}.{primary_key_field} as primary key)') raise NotImplementedError(f'Actors currently only support models that use .id as their primary key field ({cls.__name__} uses {cls.__name__}.{primary_key_field} as primary key)')
# check for STATE_FIELD_NAME classvar or set it from the model's state_field_name attr # check that ACTIVE_STATE is defined and that it exists on the StateMachineClass
if not getattr(cls, 'STATE_FIELD_NAME', None):
if hasattr(cls.Model, 'state_field_name'):
cls.STATE_FIELD_NAME = getattr(cls.Model, 'state_field_name')
else:
raise NotImplementedError(f'{cls.__name__} must define a STATE_FIELD_NAME: ClassVar[str] (e.g. "status") or have a .state_field_name attr on its Model')
assert isinstance(cls.STATE_FIELD_NAME, str), f'{cls.__name__}.STATE_FIELD_NAME must be a str, got: {type(cls.STATE_FIELD_NAME).__name__} instead'
# check for FINAL_STATES classvar or set it from the model's final_states attr
if not getattr(cls, 'FINAL_STATES', None):
cls.FINAL_STATES = cls.StateMachineClass.final_states
if not cls.FINAL_STATES:
raise NotImplementedError(f'{cls.__name__} must define a non-empty FINAL_STATES: ClassVar[list[str]] (e.g. ["sealed"]) or have a {cls.Model.__name__}.state_machine_name pointing to a StateMachine that provides .final_states')
cls.FINAL_STATES = [cls._state_to_str(state) for state in cls.FINAL_STATES]
assert all(isinstance(state, str) for state in cls.FINAL_STATES), f'{cls.__name__}.FINAL_STATES must be a list[str], got: {type(cls.FINAL_STATES).__name__} instead'
# check for ACTIVE_STATE classvar or set it from the model's active_state attr
if not getattr(cls, 'ACTIVE_STATE', None): if not getattr(cls, 'ACTIVE_STATE', None):
raise NotImplementedError(f'{cls.__name__} must define an ACTIVE_STATE: ClassVar[State] (e.g. SnapshotMachine.started) ({cls.Model.__name__}.{cls.STATE_FIELD_NAME} gets set to this value to mark objects as actively processing)') raise NotImplementedError(f'{cls.__name__} must define an ACTIVE_STATE: ClassVar[State] (e.g. SnapshotMachine.started) ({cls.Model.__name__}.{cls.Model.state_field_name} gets set to this value to mark objects as actively processing)')
assert isinstance(cls.ACTIVE_STATE, (State, str)), f'{cls.__name__}.ACTIVE_STATE must be a statemachine.State | str, got: {type(cls.ACTIVE_STATE).__name__} instead' assert isinstance(cls.ACTIVE_STATE, (State, str)) and hasattr(cls.StateMachineClass, cls._state_to_str(cls.ACTIVE_STATE)), f'{cls.__name__}.ACTIVE_STATE must be a statemachine.State | str that exists on {cls.StateMachineClass.__name__}, got: {type(cls.ACTIVE_STATE).__name__} instead'
# check the other ClassVar attributes for valid values # check the other ClassVar attributes for valid values
assert cls.CLAIM_ORDER and isinstance(cls.CLAIM_ORDER, tuple) and all(isinstance(order, str) for order in cls.CLAIM_ORDER), f'{cls.__name__}.CLAIM_ORDER must be a non-empty tuple[str, ...], got: {type(cls.CLAIM_ORDER).__name__} instead' assert cls.CLAIM_ORDER and isinstance(cls.CLAIM_ORDER, tuple) and all(isinstance(order, str) for order in cls.CLAIM_ORDER), f'{cls.__name__}.CLAIM_ORDER must be a non-empty tuple[str, ...], got: {type(cls.CLAIM_ORDER).__name__} instead'
@ -217,14 +191,14 @@ class ActorType(Generic[ModelType]):
assert cls.MAX_CONCURRENT_ACTORS >= 1, f'{cls.__name__}.MAX_CONCURRENT_ACTORS must be a positive int >=1, got: {cls.MAX_CONCURRENT_ACTORS} instead' assert cls.MAX_CONCURRENT_ACTORS >= 1, f'{cls.__name__}.MAX_CONCURRENT_ACTORS must be a positive int >=1, got: {cls.MAX_CONCURRENT_ACTORS} instead'
assert isinstance(cls.CLAIM_ATOMIC, bool), f'{cls.__name__}.CLAIM_ATOMIC must be a bool, got: {cls.CLAIM_ATOMIC} instead' assert isinstance(cls.CLAIM_ATOMIC, bool), f'{cls.__name__}.CLAIM_ATOMIC must be a bool, got: {cls.CLAIM_ATOMIC} instead'
@classmethod # @classmethod
def _fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: # def _fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int:
"""Spawn a new background thread running the actor's runloop""" # """Spawn a new background thread running the actor's runloop"""
actor = cls(mode='thread', **launch_kwargs) # actor = cls(mode='thread', **launch_kwargs)
bg_actor_thread = Thread(target=actor.runloop) # bg_actor_thread = Thread(target=actor.runloop)
bg_actor_thread.start() # bg_actor_thread.start()
assert bg_actor_thread.native_id is not None # assert bg_actor_thread.native_id is not None
return bg_actor_thread.native_id # return bg_actor_thread.native_id
@classmethod @classmethod
def _fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int: def _fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int:
@ -278,7 +252,8 @@ class ActorType(Generic[ModelType]):
@classmethod @classmethod
def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int:
if mode == 'thread': if mode == 'thread':
return cls._fork_actor_as_thread(**launch_kwargs) raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything')
# return cls._fork_actor_as_thread(**launch_kwargs)
elif mode == 'process': elif mode == 'process':
return cls._fork_actor_as_process(**launch_kwargs) return cls._fork_actor_as_process(**launch_kwargs)
raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"') raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"')
@ -295,12 +270,12 @@ class ActorType(Generic[ModelType]):
@classproperty @classproperty
def final_q(cls) -> Q: def final_q(cls) -> Q:
"""Get the filter for objects that are already completed / in a final state""" """Get the filter for objects that are already completed / in a final state"""
return Q(**{f'{cls.STATE_FIELD_NAME}__in': [cls._state_to_str(s) for s in cls.FINAL_STATES]}) return Q(**{f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states]})
@classproperty @classproperty
def active_q(cls) -> Q: def active_q(cls) -> Q:
"""Get the filter for objects that are actively processing right now""" """Get the filter for objects that are actively processing right now"""
return Q(**{cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') return Q(**{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started')
@classproperty @classproperty
def stalled_q(cls) -> Q: def stalled_q(cls) -> Q:
@ -346,7 +321,7 @@ class ActorType(Generic[ModelType]):
if obj_to_process: if obj_to_process:
self.idle_count = 0 # reset idle count if we got an object self.idle_count = 0 # reset idle count if we got an object
else: else:
if self.idle_count >= 30: if self.idle_count >= 3:
break # stop looping and exit if queue is empty and we have idled for 30sec break # stop looping and exit if queue is empty and we have idled for 30sec
else: else:
# print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...')
@ -387,7 +362,7 @@ class ActorType(Generic[ModelType]):
Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars. Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars.
""" """
return { return {
cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE), cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE),
'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME), 'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME),
} }
@ -474,18 +449,24 @@ class ActorType(Generic[ModelType]):
# get the StateMachine instance from the object # get the StateMachine instance from the object
obj_statemachine = self._get_state_machine_instance(obj_to_process) obj_statemachine = self._get_state_machine_instance(obj_to_process)
starting_state = obj_statemachine.current_state
# trigger the event on the StateMachine instance # trigger the event on the StateMachine instance
obj_tick_method = getattr(obj_statemachine, self.EVENT_NAME) # e.g. obj_statemachine.tick() obj_tick_method = getattr(obj_statemachine, self.EVENT_NAME) # e.g. obj_statemachine.tick()
obj_tick_method() obj_tick_method()
ending_state = obj_statemachine.current_state
if starting_state != ending_state:
self.on_state_change(obj_to_process, starting_state, ending_state)
# save the object to persist any state changes # save the object to persist any state changes
obj_to_process.save() obj_to_process.save()
def on_startup(self) -> None: def on_startup(self) -> None:
if self.mode == 'thread': if self.mode == 'thread':
self.pid = get_native_id() # thread id # self.pid = get_native_id() # thread id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') # print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything')
else: else:
self.pid = os.getpid() # process id self.pid = os.getpid() # process id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]') print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]')
@ -505,13 +486,13 @@ class ActorType(Generic[ModelType]):
# abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error) # abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error)
def on_tick_start(self, obj_to_process: ModelType) -> None: def on_tick_start(self, obj_to_process: ModelType) -> None:
print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
# abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj) # abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj)
# self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ')
pass pass
def on_tick_end(self, obj_to_process: ModelType) -> None: def on_tick_end(self, obj_to_process: ModelType) -> None:
print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
# abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process) # abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process)
# self.timer.end() # self.timer.end()
pass pass
@ -523,6 +504,9 @@ class ActorType(Generic[ModelType]):
print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}: [red]{type(error).__name__}: {error}[/red]') print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}: [red]{type(error).__name__}: {error}[/red]')
# abx.pm.hook.on_actor_tick_exception(actor=self, obj_to_process=obj_to_process, error=error) # abx.pm.hook.on_actor_tick_exception(actor=self, obj_to_process=obj_to_process, error=error)
def on_state_change(self, obj_to_process: ModelType, starting_state, ending_state) -> None:
print(f'🏃‍♂️ {self}.on_state_change() {obj_to_process.ABID} {starting_state} ➡️ {ending_state}')
# abx.pm.hook.on_actor_state_change(actor=self, obj_to_process=obj_to_process, starting_state=starting_state, ending_state=ending_state)
def compile_sql_select(queryset: QuerySet, filter_kwargs: dict[str, Any] | None=None, order_args: tuple[str, ...]=(), limit: int | None=None) -> tuple[str, tuple[Any, ...]]: def compile_sql_select(queryset: QuerySet, filter_kwargs: dict[str, Any] | None=None, order_args: tuple[str, ...]=(), limit: int | None=None) -> tuple[str, tuple[Any, ...]]:

View file

@ -10,8 +10,6 @@ from django.utils import timezone
import multiprocessing import multiprocessing
from threading import Thread, get_native_id
from rich import print from rich import print
@ -32,11 +30,13 @@ class Orchestrator:
actor_types: Dict[str, Type['ActorType']] = {} actor_types: Dict[str, Type['ActorType']] = {}
mode: Literal['thread', 'process'] = 'process' mode: Literal['thread', 'process'] = 'process'
exit_on_idle: bool = True exit_on_idle: bool = True
max_concurrent_actors: int = 20
def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True): def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True, max_concurrent_actors: int=max_concurrent_actors):
self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types() self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types()
self.mode = mode or self.mode self.mode = mode or self.mode
self.exit_on_idle = exit_on_idle self.exit_on_idle = exit_on_idle
self.max_concurrent_actors = max_concurrent_actors
def __repr__(self) -> str: def __repr__(self) -> str:
label = 'tid' if self.mode == 'thread' else 'pid' label = 'tid' if self.mode == 'thread' else 'pid'
@ -49,13 +49,13 @@ class Orchestrator:
def name(cls) -> str: def name(cls) -> str:
return cls.__name__ # type: ignore return cls.__name__ # type: ignore
def fork_as_thread(self): # def _fork_as_thread(self):
self.thread = Thread(target=self.runloop) # self.thread = Thread(target=self.runloop)
self.thread.start() # self.thread.start()
assert self.thread.native_id is not None # assert self.thread.native_id is not None
return self.thread.native_id # return self.thread.native_id
def fork_as_process(self): def _fork_as_process(self):
self.process = multiprocessing.Process(target=self.runloop) self.process = multiprocessing.Process(target=self.runloop)
self.process.start() self.process.start()
assert self.process.pid is not None assert self.process.pid is not None
@ -63,9 +63,10 @@ class Orchestrator:
def start(self) -> int: def start(self) -> int:
if self.mode == 'thread': if self.mode == 'thread':
return self.fork_as_thread() # return self._fork_as_thread()
raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
elif self.mode == 'process': elif self.mode == 'process':
return self.fork_as_process() return self._fork_as_process()
raise ValueError(f'Invalid orchestrator mode: {self.mode}') raise ValueError(f'Invalid orchestrator mode: {self.mode}')
@classmethod @classmethod
@ -108,8 +109,9 @@ class Orchestrator:
def on_startup(self): def on_startup(self):
if self.mode == 'thread': if self.mode == 'thread':
self.pid = get_native_id() # self.pid = get_native_id()
print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') # print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]')
raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
elif self.mode == 'process': elif self.mode == 'process':
self.pid = os.getpid() self.pid = os.getpid()
print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]') print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]')
@ -120,16 +122,18 @@ class Orchestrator:
# abx.pm.hook.on_orchestrator_shutdown(self) # abx.pm.hook.on_orchestrator_shutdown(self)
def on_tick_started(self, all_queues): def on_tick_started(self, all_queues):
total_pending = sum(queue.count() for queue in all_queues.values()) # total_pending = sum(queue.count() for queue in all_queues.values())
print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') # if total_pending:
# print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}')
# abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues)
pass pass
def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors): def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors):
if all_spawned_actors: # if all_spawned_actors:
total_queue_length = sum(queue.count() for queue in all_queues.values()) # total_queue_length = sum(queue.count() for queue in all_queues.values())
print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]') # print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]')
# abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues) # abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues)
pass
def on_idle(self, all_queues): def on_idle(self, all_queues):
print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}') print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}')
@ -162,13 +166,18 @@ class Orchestrator:
all_spawned_actors = [] all_spawned_actors = []
for actor_type, queue in all_queues.items(): for actor_type, queue in all_queues.items():
next_obj = queue.first() if not queue.exists():
print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') continue
# next_obj = queue.first()
# print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}')
self.idle_count = 0
try: try:
existing_actors = actor_type.get_running_actors() existing_actors = actor_type.get_running_actors()
all_existing_actors.extend(existing_actors) all_existing_actors.extend(existing_actors)
actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors) actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors)
for launch_kwargs in actors_to_spawn: can_spawn_num_remaining = self.max_concurrent_actors - len(all_existing_actors) # set max_concurrent_actors=1 to disable multitasking
for launch_kwargs in actors_to_spawn[:can_spawn_num_remaining]:
new_actor_pid = actor_type.start(mode='process', **launch_kwargs) new_actor_pid = actor_type.start(mode='process', **launch_kwargs)
all_spawned_actors.append(new_actor_pid) all_spawned_actors.append(new_actor_pid)
except Exception as err: except Exception as err:
@ -192,98 +201,3 @@ class Orchestrator:
else: else:
print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err)
self.on_shutdown(err=err) self.on_shutdown(err=err)
# from archivebox.config.django import setup_django
# setup_django()
# from core.models import ArchiveResult, Snapshot
# from django.utils import timezone
# from django import db
# from django.db import connection
# from crawls.actors import CrawlActor
# from core.actors import SnapshotActor, ArchiveResultActor
# class ArchivingOrchestrator(Orchestrator):
# actor_types = {
# 'CrawlActor': CrawlActor,
# 'SnapshotActor': SnapshotActor,
# 'ArchiveResultActor': ArchiveResultActor,
# # 'FaviconActor': FaviconActor,
# # 'SinglefileActor': SinglefileActor,
# }
# from abx_plugin_singlefile.actors import SinglefileActor
# class FaviconActor(ActorType[ArchiveResult]):
# CLAIM_ORDER: ClassVar[str] = 'created_at DESC'
# CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"'
# CLAIM_SET: ClassVar[str] = 'status = "started"'
# @classproperty
# def QUERYSET(cls) -> QuerySet:
# return ArchiveResult.objects.filter(status='failed', extractor='favicon')
# def tick(self, obj: ArchiveResult):
# print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count())
# updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1
# if not updated:
# raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object')
# obj.refresh_from_db()
# obj.save()
# class ArchivingOrchestrator(Orchestrator):
# actor_types = {
# 'CrawlActor': CrawlActor,
# 'SnapshotActor': SnapshotActor,
# 'ArchiveResultActor': ArchiveResultActor,
# # 'FaviconActor': FaviconActor,
# # 'SinglefileActor': SinglefileActor,
# }
# if __name__ == '__main__':
# orchestrator = ExtractorsOrchestrator()
# orchestrator.start()
# snap = Snapshot.objects.last()
# assert snap is not None
# created = 0
# while True:
# time.sleep(0.05)
# # try:
# # ArchiveResult.objects.bulk_create([
# # ArchiveResult(
# # id=uuid.uuid4(),
# # snapshot=snap,
# # status='failed',
# # extractor='favicon',
# # cmd=['echo', '"hello"'],
# # cmd_version='1.0',
# # pwd='.',
# # start_ts=timezone.now(),
# # end_ts=timezone.now(),
# # created_at=timezone.now(),
# # modified_at=timezone.now(),
# # created_by_id=1,
# # )
# # for _ in range(100)
# # ])
# # created += 100
# # if created % 1000 == 0:
# # print(f'[blue]Created {created} ArchiveResults...[/blue]')
# # time.sleep(25)
# # except Exception as err:
# # print(err)
# # db.connections.close_all()
# # except BaseException as err:
# # print(err)
# # break