better actor atomic claim

This commit is contained in:
Nick Sweeting 2024-11-02 19:54:25 -07:00
parent 9b24fe7390
commit 2337f874ad
No known key found for this signature in database
2 changed files with 215 additions and 159 deletions

View file

@ -2,6 +2,7 @@ __package__ = 'archivebox.actors'
import os import os
import time import time
from abc import ABC, abstractmethod
from typing import ClassVar, Generic, TypeVar, Any, cast, Literal, Type from typing import ClassVar, Generic, TypeVar, Any, cast, Literal, Type
from django.utils.functional import classproperty from django.utils.functional import classproperty
@ -16,148 +17,77 @@ from threading import Thread, get_native_id
# from archivebox.logging_util import TimedProgress # from archivebox.logging_util import TimedProgress
ALL_SPAWNED_ACTORS: list[psutil.Process] = []
LaunchKwargs = dict[str, Any] LaunchKwargs = dict[str, Any]
ModelType = TypeVar('ModelType', bound=models.Model) ModelType = TypeVar('ModelType', bound=models.Model)
class ActorType(Generic[ModelType]): class ActorType(ABC, Generic[ModelType]):
"""
Base class for all actors. Usage:
class FaviconActor(ActorType[ArchiveResult]):
QUERYSET: ClassVar[QuerySet] = ArchiveResult.objects.filter(status='queued', extractor='favicon')
CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"'
CLAIM_ORDER: ClassVar[str] = 'created_at DESC'
ATOMIC: ClassVar[bool] = True
def claim_sql_set(self, obj: ArchiveResult) -> str:
# SQL fields to update atomically while claiming an object from the queue
retry_at = datetime.now() + timedelta(seconds=self.MAX_TICK_TIME)
return f"status = 'started', locked_by = {self.pid}, retry_at = {retry_at}"
def tick(self, obj: ArchiveResult) -> None:
run_favicon_extractor(obj)
ArchiveResult.objects.filter(pk=obj.pk, status='started').update(status='success')
"""
pid: int pid: int
idle_count: int = 0 idle_count: int = 0
launch_kwargs: LaunchKwargs = {} launch_kwargs: LaunchKwargs = {}
mode: Literal['thread', 'process'] = 'process'
QUERYSET: ClassVar[QuerySet] # the QuerySet to claim objects from
CLAIM_WHERE: ClassVar[str] = 'status = "queued"' # the WHERE clause to filter the objects when atomically getting the next object from the queue
CLAIM_SET: ClassVar[str] = 'status = "started"' # the SET clause to claim the object when atomically getting the next object from the queue
CLAIM_ORDER: ClassVar[str] = 'created_at DESC' # the ORDER BY clause to sort the objects with when atomically getting the next object from the queue
CLAIM_FROM_TOP: ClassVar[int] = 50 # the number of objects to consider when atomically getting the next object from the queue
ATOMIC: ClassVar[bool] = True # whether to atomically fetch+claim the nextobject in one step, or fetch and lock it in two steps
# model_type: Type[ModelType] # model_type: Type[ModelType]
MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8 MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8, up to 60% of available cpu cores
MAX_TICK_TIME: ClassVar[int] = 60 MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object
def __init__(self, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs): _SPAWNED_ACTOR_PIDS: ClassVar[list[psutil.Process]] = [] # record all the pids of Actors spawned by this class
self.mode = mode
def __init__(self, mode: Literal['thread', 'process']|None=None, **launch_kwargs: LaunchKwargs):
self.mode = mode or self.mode
self.launch_kwargs = launch_kwargs or dict(self.launch_kwargs) self.launch_kwargs = launch_kwargs or dict(self.launch_kwargs)
def __repr__(self) -> str:
label = 'pid' if self.mode == 'process' else 'tid'
return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]'
def __str__(self) -> str:
return self.__repr__()
@classproperty @classproperty
def name(cls) -> str: def name(cls) -> str:
return cls.__name__ # type: ignore return cls.__name__ # type: ignore
def __str__(self) -> str:
return self.__repr__()
def __repr__(self) -> str:
"""FaviconActor[pid=1234]"""
label = 'pid' if self.mode == 'process' else 'tid'
return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]'
### Class Methods: Called by Orchestrator on ActorType class before it has been spawned
@classmethod @classmethod
def get_running_actors(cls) -> list[int]: def get_running_actors(cls) -> list[int]:
"""returns a list of pids of all running actors of this type""" """returns a list of pids of all running actors of this type"""
# WARNING: only works for process actors, not thread actors # WARNING: only works for process actors, not thread actors
if cls.mode == 'thread':
raise NotImplementedError('get_running_actors() is not implemented for thread actors')
return [ return [
proc.pid for proc in ALL_SPAWNED_ACTORS proc.pid for proc in cls._SPAWNED_ACTOR_PIDS
if proc.is_running() and proc.status() != 'zombie' if proc.is_running() and proc.status() != 'zombie'
] ]
@classmethod @classmethod
def fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: def get_actors_to_spawn(cls, queue: QuerySet, running_actors: list[int]) -> list[LaunchKwargs]:
actor = cls(mode='thread', **launch_kwargs)
bg_actor_thread = Thread(target=actor.runloop)
bg_actor_thread.start()
assert bg_actor_thread.native_id is not None
return bg_actor_thread.native_id
@classmethod
def fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int:
actor = cls(mode='process', **launch_kwargs)
bg_actor_process = Process(target=actor.runloop)
bg_actor_process.start()
assert bg_actor_process.pid is not None
ALL_SPAWNED_ACTORS.append(psutil.Process(pid=bg_actor_process.pid))
return bg_actor_process.pid
@classmethod
def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int:
if mode == 'thread':
return cls.fork_actor_as_thread(**launch_kwargs)
elif mode == 'process':
return cls.fork_actor_as_process(**launch_kwargs)
raise ValueError(f'Invalid actor mode: {mode}')
@classmethod
def get_queue(cls) -> QuerySet:
"""override this to provide your queryset as the queue"""
# return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot'))
raise NotImplementedError
@classmethod
def get_next(cls, atomic: bool=True) -> ModelType | None:
if atomic:
return cls.get_next_atomic(model=cls.get_queue().model)
return cls.get_queue().last()
@classmethod
def get_random(cls, model: Type[ModelType], where='status = "queued"', set='status = "started"', choose_from_top=50) -> ModelType | None:
app_label = model._meta.app_label
model_name = model._meta.model_name
with db.connection.cursor() as cursor:
# subquery gets the pool of the top 50 candidates sorted by sort and order
# main query selects a random one from that pool
cursor.execute(f"""
UPDATE {app_label}_{model_name}
SET {set}
WHERE {where} and id = (
SELECT id FROM {app_label}_{model_name}
WHERE {where}
LIMIT 1
OFFSET ABS(RANDOM()) % {choose_from_top}
)
RETURNING id;
""")
result = cursor.fetchone()
# If no rows were claimed, return None
if result is None:
return None
return model.objects.get(id=result[0])
@classmethod
def get_next_atomic(cls, model: Type[ModelType], where='status = "queued"', set='status = "started"', order_by='created_at DESC', choose_from_top=50) -> ModelType | None:
"""
atomically claim a random object from the top n=50 objects in the queue by updating status=queued->started
optimized for minimizing contention on the queue with other actors selecting from the same list
"""
app_label = model._meta.app_label
model_name = model._meta.model_name
with db.connection.cursor() as cursor:
# subquery gets the pool of the top 50 candidates sorted by sort and order
# main query selects a random one from that pool
cursor.execute(f"""
UPDATE {app_label}_{model_name}
SET {set}
WHERE {where} and id = (
SELECT id FROM (
SELECT id FROM {app_label}_{model_name}
WHERE {where}
ORDER BY {order_by}
LIMIT {choose_from_top}
) candidates
ORDER BY RANDOM()
LIMIT 1
)
RETURNING id;
""")
result = cursor.fetchone()
# If no rows were claimed, return None
if result is None:
return None
return model.objects.get(id=result[0])
@classmethod
def get_actors_to_spawn(cls, queue, running_actors) -> list[LaunchKwargs]:
"""Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors""" """Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors"""
actors_to_spawn: list[LaunchKwargs] = [] actors_to_spawn: list[LaunchKwargs] = []
max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors) max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors)
@ -175,69 +105,78 @@ class ActorType(Generic[ModelType]):
else: # queue is short, spawn 1 actor else: # queue is short, spawn 1 actor
actors_to_spawn += min(1, max_spawnable) * [{**cls.launch_kwargs}] actors_to_spawn += min(1, max_spawnable) * [{**cls.launch_kwargs}]
return actors_to_spawn return actors_to_spawn
def on_startup(self):
if self.mode == 'thread':
self.pid = get_native_id() # thread id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
else:
self.pid = os.getpid() # process id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]')
# abx.pm.hook.on_actor_startup(self)
def on_shutdown(self, err: BaseException | None=None): @classmethod
print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int:
# abx.pm.hook.on_actor_shutdown(self) if mode == 'thread':
return cls.fork_actor_as_thread(**launch_kwargs)
elif mode == 'process':
return cls.fork_actor_as_process(**launch_kwargs)
raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"')
def on_tick_start(self, obj: ModelType): @classmethod
# print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id) def fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int:
# abx.pm.hook.on_actor_tick_start(self, obj_to_process) """Spawn a new background thread running the actor's runloop"""
# self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') actor = cls(mode='thread', **launch_kwargs)
pass bg_actor_thread = Thread(target=actor.runloop)
bg_actor_thread.start()
assert bg_actor_thread.native_id is not None
return bg_actor_thread.native_id
def on_tick_end(self, obj: ModelType): @classmethod
# print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id) def fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int:
# abx.pm.hook.on_actor_tick_end(self, obj_to_process) """Spawn a new background process running the actor's runloop"""
# self.timer.end() actor = cls(mode='process', **launch_kwargs)
pass bg_actor_process = Process(target=actor.runloop)
bg_actor_process.start()
assert bg_actor_process.pid is not None
cls._SPAWNED_ACTOR_PIDS.append(psutil.Process(pid=bg_actor_process.pid))
return bg_actor_process.pid
def on_tick_exception(self, obj: ModelType, err: BaseException): @classmethod
print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err) def get_model(cls) -> Type[ModelType]:
# abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) # wish this was a @classproperty but Generic[ModelType] return type cant be statically inferred for @classproperty
return cls.QUERYSET.model
@classmethod
def get_queue(cls) -> QuerySet:
"""override this to provide your queryset as the queue"""
# return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot'))
return cls.QUERYSET
### Instance Methods: Called by Actor after it has been spawned (i.e. forked as a thread or process)
def runloop(self): def runloop(self):
"""The main runloop that starts running when the actor is spawned (as subprocess or thread) and exits when the queue is empty"""
self.on_startup() self.on_startup()
try: try:
while True: while True:
obj_to_process: ModelType | None = None obj_to_process: ModelType | None = None
try: try:
obj_to_process = cast(ModelType, self.get_next()) obj_to_process = cast(ModelType, self.get_next(atomic=self.atomic))
except Exception: except Exception:
pass pass
if obj_to_process: if obj_to_process:
self.idle_count = 0 self.idle_count = 0 # reset idle count if we got an object
else: else:
if self.idle_count >= 30: if self.idle_count >= 30:
break # stop looping and exit if queue is empty and we have rechecked it 30 times break # stop looping and exit if queue is empty and we have idled for 30sec
else: else:
# print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...')
self.idle_count += 1 self.idle_count += 1
time.sleep(1) time.sleep(1)
continue continue
if not self.lock(obj_to_process):
# we are unable to lock the object, some other actor got it first. skip it and get the next object
continue
self.on_tick_start(obj_to_process) self.on_tick_start(obj_to_process)
# Process the object
try: try:
# run the tick function on the object
self.tick(obj_to_process) self.tick(obj_to_process)
except Exception as err: except Exception as err:
print(f'[red]🏃‍♂️ ERROR: {self}.tick()[/red]', err) print(f'[red]🏃‍♂️ ERROR: {self}.tick()[/red]', err)
db.connections.close_all() db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions
self.on_tick_exception(obj_to_process, err) self.on_tick_exception(obj_to_process, err)
finally: finally:
self.on_tick_end(obj_to_process) self.on_tick_end(obj_to_process)
@ -249,12 +188,125 @@ class ActorType(Generic[ModelType]):
else: else:
print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err)
self.on_shutdown(err=err) self.on_shutdown(err=err)
def get_next(self, atomic: bool | None=None) -> ModelType | None:
"""get the next object from the queue, atomically locking it if self.atomic=True"""
if atomic is None:
atomic = self.ATOMIC
def tick(self, obj: ModelType) -> None: if atomic:
print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id) # fetch and claim the next object from in the queue in one go atomically
obj = self.get_next_atomic()
def lock(self, obj: ModelType) -> bool: else:
print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', obj.abid or obj.id) # two-step claim: fetch the next object and lock it in a separate query
obj = self.get_queue().last()
assert obj and self.lock_next(obj), f'Unable to fetch+lock the next {self.get_model().__name__} ojbect from {self}.QUEUE'
return obj
def lock_next(self, obj: ModelType) -> bool:
"""override this to implement a custom two-step (non-atomic)lock mechanism"""
# For example:
# assert obj._model.objects.filter(pk=obj.pk, status='queued').update(status='started', locked_by=self.pid)
# Not needed if using get_next_and_lock() to claim the object atomically
# print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', obj.abid or obj.id)
return True return True
def claim_sql_where(self) -> str:
"""override this to implement a custom WHERE clause for the atomic claim step e.g. "status = 'queued' AND locked_by = NULL" """
return self.CLAIM_WHERE
def claim_sql_set(self) -> str:
"""override this to implement a custom SET clause for the atomic claim step e.g. "status = 'started' AND locked_by = {self.pid}" """
return self.CLAIM_SET
def claim_sql_order(self) -> str:
"""override this to implement a custom ORDER BY clause for the atomic claim step e.g. "created_at DESC" """
return self.CLAIM_ORDER
def claim_from_top(self) -> int:
"""override this to implement a custom number of objects to consider when atomically claiming the next object from the top of the queue"""
return self.CLAIM_FROM_TOP
def get_next_atomic(self, shallow: bool=True) -> ModelType | None:
"""
claim a random object from the top n=50 objects in the queue (atomically updates status=queued->started for claimed object)
optimized for minimizing contention on the queue with other actors selecting from the same list
slightly faster than claim_any_obj() which selects randomly from the entire queue but needs to know the total count
"""
Model = self.get_model() # e.g. ArchiveResult
table = f'{Model._meta.app_label}_{Model._meta.model_name}' # e.g. core_archiveresult
where_sql = self.claim_sql_where()
set_sql = self.claim_sql_set()
order_by_sql = self.claim_sql_order()
choose_from_top = self.claim_from_top()
with db.connection.cursor() as cursor:
# subquery gets the pool of the top 50 candidates sorted by sort and order
# main query selects a random one from that pool
cursor.execute(f"""
UPDATE {table}
SET {set_sql}
WHERE {where_sql} and id = (
SELECT id FROM (
SELECT id FROM {table}
WHERE {where_sql}
ORDER BY {order_by_sql}
LIMIT {choose_from_top}
) candidates
ORDER BY RANDOM()
LIMIT 1
)
RETURNING id;
""")
result = cursor.fetchone()
if result is None:
return None # If no rows were claimed, return None
if shallow:
# shallow: faster, returns potentially incomplete object instance missing some django auto-populated fields:
columns = [col[0] for col in cursor.description or ['id']]
return Model(**dict(zip(columns, result)))
# if not shallow do one extra query to get a more complete object instance (load it fully from scratch)
return Model.objects.get(id=result[0])
@abstractmethod
def tick(self, obj: ModelType) -> None:
"""override this to process the object"""
print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id)
# For example:
# do_some_task(obj)
# do_something_else(obj)
# obj._model.objects.filter(pk=obj.pk, status='started').update(status='success')
raise NotImplementedError('tick() must be implemented by the Actor subclass')
def on_startup(self) -> None:
if self.mode == 'thread':
self.pid = get_native_id() # thread id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
else:
self.pid = os.getpid() # process id
print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]')
# abx.pm.hook.on_actor_startup(self)
def on_shutdown(self, err: BaseException | None=None) -> None:
print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]')
# abx.pm.hook.on_actor_shutdown(self)
def on_tick_start(self, obj: ModelType) -> None:
# print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id)
# abx.pm.hook.on_actor_tick_start(self, obj_to_process)
# self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ')
pass
def on_tick_end(self, obj: ModelType) -> None:
# print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id)
# abx.pm.hook.on_actor_tick_end(self, obj_to_process)
# self.timer.end()
pass
def on_tick_exception(self, obj: ModelType, err: BaseException) -> None:
print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err)
# abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err)

View file

@ -191,22 +191,26 @@ class FaviconActor(ActorType[ArchiveResult]):
# ) # )
return cls.get_random( return cls.get_random(
model=ArchiveResult, model=ArchiveResult,
where='status = "failed"', where='status = "failed" AND extractor = "favicon"',
set='status = "queued"', set='status = "queued"',
choose_from_top=cls.get_queue().count(), choose_from_top=50,
) )
def tick(self, obj: ArchiveResult): def tick(self, obj: ArchiveResult):
print(f'[grey53]{self}.tick({obj.abid or obj.id}) remaining:[/grey53]', self.get_queue().count()) print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count())
updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1
if not updated: if not updated:
raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object')
# obj.refresh_from_db()
obj.status = 'success'
def lock(self, obj: ArchiveResult) -> bool: def lock(self, obj: ArchiveResult) -> bool:
"""As an alternative to self.get_next_atomic(), we can use select_for_update() or manually update a semaphore field here""" """As an alternative to self.get_next_atomic(), we can use select_for_update() or manually update a semaphore field here"""
locked = ArchiveResult.objects.filter(id=obj.id, status='queued').update(status='started') == 1 locked = ArchiveResult.objects.filter(id=obj.id, status='queued').update(status='started') == 1
if locked: if locked:
# obj.refresh_from_db()
obj.status = 'started'
# print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒') # print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒')
pass pass
else: else: