From 17faa5a507ffb5ac1068fa38d9a38fc56de9c5b9 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Thu, 31 Oct 2024 07:10:43 -0700 Subject: [PATCH] improvements to new actor and orchestrators --- archivebox/actors/actor.py | 243 ++++++++++++++++++---------- archivebox/actors/orchestrator.py | 252 ++++++++++++++++-------------- 2 files changed, 298 insertions(+), 197 deletions(-) diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index 6e9d523b..97316405 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -3,8 +3,12 @@ __package__ = 'archivebox.actors' import os import time import psutil -from typing import ClassVar, Generic, TypeVar, Any, cast, Literal +from typing import ClassVar, Generic, TypeVar, Any, cast, Literal, Type +from rich import print + +from django import db +from django.db import models from django.db.models import QuerySet from multiprocessing import Process, cpu_count from threading import Thread, get_native_id @@ -16,129 +20,210 @@ ALL_SPAWNED_ACTORS: list[psutil.Process] = [] LaunchKwargs = dict[str, Any] -ObjectType = TypeVar('ObjectType') +ModelType = TypeVar('ModelType', bound=models.Model) -class ActorType(Generic[ObjectType]): +class ActorType(Generic[ModelType]): pid: int + idle_count: int = 0 + launch_kwargs: LaunchKwargs = {} - MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.7)), 8) # min 2, max 8 + # model_type: Type[ModelType] + MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8 MAX_TICK_TIME: ClassVar[int] = 60 def __init__(self, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs): self.mode = mode - self.launch_kwargs = launch_kwargs + self.launch_kwargs = launch_kwargs or dict(self.launch_kwargs) + + def __repr__(self) -> str: + label = 'pid' if self.mode == 'process' else 'tid' + return f'[underline]{self.__class__.__name__}[/underline]\\[{label}={self.pid}]' + + def __str__(self) -> str: + return self.__repr__() @classmethod def get_running_actors(cls) -> list[int]: - # returns a list of pids of all running actors of this type + """returns a list of pids of all running actors of this type""" + # WARNING: only works for process actors, not thread actors return [ proc.pid for proc in ALL_SPAWNED_ACTORS if proc.is_running() and proc.status() != 'zombie' ] + + @classmethod + def fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: + actor = cls(mode='thread', **launch_kwargs) + bg_actor_thread = Thread(target=actor.runloop) + bg_actor_thread.start() + assert bg_actor_thread.native_id is not None + return bg_actor_thread.native_id @classmethod - def spawn_actor(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: - actor = cls(mode=mode, **launch_kwargs) - # bg_actor_proccess = Process(target=actor.runloop) + def fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int: + actor = cls(mode='process', **launch_kwargs) + bg_actor_process = Process(target=actor.runloop) + bg_actor_process.start() + assert bg_actor_process.pid is not None + ALL_SPAWNED_ACTORS.append(psutil.Process(pid=bg_actor_process.pid)) + return bg_actor_process.pid + + @classmethod + def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: if mode == 'thread': - bg_actor_thread = Thread(target=actor.runloop) - bg_actor_thread.start() - assert bg_actor_thread.native_id is not None - return bg_actor_thread.native_id - else: - bg_actor_process = Process(target=actor.runloop) - bg_actor_process.start() - assert bg_actor_process.pid is not None - ALL_SPAWNED_ACTORS.append(psutil.Process(pid=bg_actor_process.pid)) - return bg_actor_process.pid + return cls.fork_actor_as_thread(**launch_kwargs) + elif mode == 'process': + return cls.fork_actor_as_process(**launch_kwargs) + raise ValueError(f'Invalid actor mode: {mode}') @classmethod def get_queue(cls) -> QuerySet: + """override this to provide your queryset as the queue""" # return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot')) raise NotImplementedError @classmethod - def get_next(cls) -> ObjectType | None: + def get_next(cls, atomic: bool=True) -> ModelType | None: + if atomic: + return cls.get_next_atomic(model=cls.get_queue().model) return cls.get_queue().last() + @classmethod + def get_next_atomic(cls, model: Type, filter=('status', 'queued'), update=('status', 'started'), sort='created_at', order='DESC', choose_from_top=50) -> ModelType | None: + """ + atomically claim a random object from the top n=50 objects in the queue by updating status=queued->started + optimized for minimizing contention on the queue with other actors selecting from the same list + """ + app_label = model._meta.app_label + model_name = model._meta.model_name + + with db.connection.cursor() as cursor: + # subquery gets the pool of the top 50 candidates sorted by sort and order + # main query selects a random one from that pool + cursor.execute(f""" + UPDATE {app_label}_{model_name} + SET {update[0]} = '{update[1]}' + WHERE {filter[0]} = '{filter[1]}' and id = ( + SELECT id FROM ( + SELECT id FROM {app_label}_{model_name} + WHERE {filter[0]} = '{filter[1]}' + ORDER BY {sort} {order} + LIMIT {choose_from_top} + ) candidates + ORDER BY RANDOM() + LIMIT 1 + ) + RETURNING *; + """) + result = cursor.fetchone() + + # If no rows were claimed, return None + if result is None: + return None + + # reconstruct model instance from the row tuple + columns = [col[0] for col in cursor.description] + return model(**dict(zip(columns, result))) + @classmethod def get_actors_to_spawn(cls, queue, running_actors) -> list[LaunchKwargs]: + """Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors""" actors_to_spawn: list[LaunchKwargs] = [] max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors) queue_length = queue.count() - if not queue_length: # queue is empty, spawn 0 actors + # spawning new actors is expensive, avoid spawning all the actors at once. To stagger them, + # let the next orchestrator tick handle starting another 2 on the next tick() + # if queue_length > 10: # queue is long, spawn as many as possible + # actors_to_spawn += max_spawnable * [{}] + + if not queue_length: # queue is empty, spawn 0 actors return actors_to_spawn - elif queue_length > 10: # queue is long, spawn as many as possible - actors_to_spawn += max_spawnable * [{}] - elif queue_length > 5: # queue is medium, spawn 1 or 2 actors - actors_to_spawn += min(2, max_spawnable) * [{}] - else: # queue is short, spawn 1 actor - actors_to_spawn += min(1, max_spawnable) * [{}] + elif queue_length > 4: # queue is medium, spawn 1 or 2 actors + actors_to_spawn += min(2, max_spawnable) * [{**cls.launch_kwargs}] + else: # queue is short, spawn 1 actor + actors_to_spawn += min(1, max_spawnable) * [{**cls.launch_kwargs}] return actors_to_spawn - + def on_startup(self): if self.mode == 'thread': - self.pid = get_native_id() + self.pid = get_native_id() # thread id + print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') else: - self.pid = os.getpid() - print('Actor on_startup()', f'pid={self.pid}') + self.pid = os.getpid() # process id + print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]') # abx.pm.hook.on_actor_startup(self) - def on_shutdown(self): - print('Actor on_shutdown()', f'pid={self.pid}') + def on_shutdown(self, err: BaseException | None=None): + print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') # abx.pm.hook.on_actor_shutdown(self) + + def on_tick_start(self, obj: ModelType): + # print(f'🏃‍♂️ {self}.on_tick_start()', getattr(obj, 'abid', obj.id)) + # abx.pm.hook.on_actor_tick_start(self, obj_to_process) + # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') + pass + + def on_tick_end(self, obj: ModelType): + # print(f'🏃‍♂️ {self}.on_tick_end()', getattr(obj, 'abid', obj.id)) + # abx.pm.hook.on_actor_tick_end(self, obj_to_process) + # self.timer.end() + pass + + def on_tick_exception(self, obj: ModelType, err: BaseException): + print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', getattr(obj, 'abid', obj.id), err) + # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) def runloop(self): self.on_startup() - - rechecks = 30 - - while True: - obj_to_process: ObjectType | None = None - try: - obj_to_process = cast(ObjectType, self.get_next()) - except Exception: - pass - - if obj_to_process: - rechecks = 30 - else: - if rechecks == 0: - break # stop looping and exit if queue is empty - else: - # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') - rechecks -= 1 - time.sleep(1) - continue - - if not self.lock(obj_to_process): - continue - - # abx.pm.hook.on_actor_tick_start(self, obj_to_process) - try: - # timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') + try: + while True: + obj_to_process: ModelType | None = None + try: + obj_to_process = cast(ModelType, self.get_next()) + except Exception: + pass - # run the tick function on the object - self.tick(obj_to_process) - except Exception as err: - # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) - print('ERROR: actor tick failed', err) - # refresh the db connection - from django import db - db.connections.close_all() - finally: - # timer.end() - pass - # abx.pm.hook.on_actor_tick_end(self, obj_to_process) + if obj_to_process: + self.idle_count = 0 + else: + if self.idle_count >= 30: + break # stop looping and exit if queue is empty and we have rechecked it 30 times + else: + # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') + self.idle_count += 1 + time.sleep(1) + continue + + if not self.lock(obj_to_process): + # we are unable to lock the object, some other actor got it first. skip it and get the next object + continue + + self.on_tick_start(obj_to_process) + + try: + # run the tick function on the object + self.tick(obj_to_process) + except Exception as err: + print(f'[red]🏃‍♂️ ERROR: {self}.tick()[/red]', err) + db.connections.close_all() + self.on_tick_exception(obj_to_process, err) + finally: + self.on_tick_end(obj_to_process) + + self.on_shutdown(err=None) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + print() + else: + print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) + self.on_shutdown(err=err) + + def tick(self, obj: ModelType) -> None: + print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', getattr(obj, 'abid', obj.id)) - self.on_shutdown() - - def tick(self, obj: ObjectType) -> None: - print('Actor Processing tick()', obj) - - def lock(self, obj: ObjectType) -> bool: - print('Actor lock()', obj) + def lock(self, obj: ModelType) -> bool: + print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', getattr(obj, 'abid', obj.id)) return True diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index 5a404850..1ca90148 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -2,104 +2,132 @@ __package__ = 'archivebox.actors' import os import time -from typing import Dict +import itertools +import uuid +from typing import Dict, Type -from multiprocessing import Process +from multiprocessing import Process, cpu_count + +from rich import print from django.db.models import QuerySet +from django.apps import apps from .actor import ActorType class Orchestrator: pid: int + idle_count: int = 0 + actor_types: Dict[str, Type[ActorType]] - @classmethod - def spawn_orchestrator(cls) -> int: - orchestrator = cls() - orchestrator_bg_proc = Process(target=orchestrator.runloop) + def __init__(self, actor_types: Dict[str, Type[ActorType]] | None = None): + self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types() + + def __repr__(self) -> str: + return f'[underline]{self.__class__.__name__}[/underline]\\[pid={self.pid}]' + + def __str__(self) -> str: + return self.__repr__() + + def start(self) -> int: + orchestrator_bg_proc = Process(target=self.runloop) orchestrator_bg_proc.start() assert orchestrator_bg_proc.pid is not None return orchestrator_bg_proc.pid @classmethod - def get_all_actor_types(cls) -> Dict[str, ActorType]: - # returns a Dict of all discovered {actor_type_id: ActorType} ... + def autodiscover_actor_types(cls) -> Dict[str, Type[ActorType]]: + # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase + # override this method in a subclass to customize the actor types that are used # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} return { - 'TestActor': TestActor(), + # look through all models and find all classes that inherit from ActorType + # ... } @classmethod def get_orphaned_objects(cls, all_queues) -> list: # returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types - return [] + all_queued_ids = itertools.chain(*[queue.values('id', flat=True) for queue in all_queues.values()]) + orphaned_objects = [] + for model in apps.get_models(): + if hasattr(model, 'retry_at'): + orphaned_objects.extend(model.objects.filter(retry_at__lt=timezone.now()).exclude(id__in=all_queued_ids)) + return orphaned_objects def on_startup(self): self.pid = os.getpid() - print('Orchestrator startup', self.pid) + print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]') # abx.pm.hook.on_orchestrator_startup(self) def on_shutdown(self, err: BaseException | None = None): - print('Orchestrator shutdown', self.pid, err) + print(f'[grey53]👨‍✈️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') # abx.pm.hook.on_orchestrator_shutdown(self) - def on_tick_started(self, actor_types, all_queues): - total_pending = sum(queue.count() for queue in all_queues.values()) - print('Orchestrator tick +', self.pid, f'total_pending={total_pending}') + def on_tick_started(self, all_queues): + # total_pending = sum(queue.count() for queue in all_queues.values()) + # print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) + pass - def on_tick_finished(self, actor_types, all_queues): - # print('Orchestrator tick √', self.pid) + def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors): + if all_spawned_actors: + total_queue_length = sum(queue.count() for queue in all_queues.values()) + print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]') # abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues) - pass - - def on_idle(self): - # print('Orchestrator idle', self.pid) + + def on_idle(self, all_queues): + # print(f'👨‍✈️ {self}.on_idle()') # abx.pm.hook.on_orchestrator_idle(self) - pass - + # check for orphaned objects left behind + if self.idle_count == 60: + orphaned_objects = self.get_orphaned_objects(all_queues) + if orphaned_objects: + print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 60s:[/red]', orphaned_objects) + def runloop(self): - self.pid = os.getpid() - + self.on_startup() try: while True: - actor_types = self.get_all_actor_types() all_queues = { actor_type: actor_type.get_queue() - for actor_type in actor_types.values() + for actor_type in self.actor_types.values() } - self.on_tick_started(actor_types, all_queues) + if not all_queues: + raise Exception('Failed to find any actor_types to process') + + self.on_tick_started(all_queues) all_existing_actors = [] all_spawned_actors = [] for actor_type, queue in all_queues.items(): - existing_actors = actor_type.get_running_actors() - all_existing_actors.extend(existing_actors) - actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors) - for launch_kwargs in actors_to_spawn: - all_spawned_actors.append(actor_type.spawn_actor(**launch_kwargs)) - - if all_spawned_actors: - print(f'Found {len(all_existing_actors)} existing actors, Spawned {len(all_spawned_actors)} new actors') - else: - # print(f'No actors to spawn, currently_running: {len(all_existing_actors)}') - time.sleep(1) - - orphaned_objects = self.get_orphaned_objects(all_queues) - if orphaned_objects: - print('WARNING: some objects may will not be processed', orphaned_objects) + try: + existing_actors = actor_type.get_running_actors() + all_existing_actors.extend(existing_actors) + actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors) + for launch_kwargs in actors_to_spawn: + new_actor_pid = actor_type.start(mode='process', **launch_kwargs) + all_spawned_actors.append(new_actor_pid) + except BaseException as err: + print(f'🏃‍♂️ ERROR: {self} Failed to get {actor_type} queue & running actors', err) if not any(queue.exists() for queue in all_queues.values()): - # we are idle - self.on_idle() - # time.sleep(0.250) - time.sleep(2) + self.on_idle(all_queues) + self.idle_count += 1 + time.sleep(1) + else: + self.idle_count = 0 - self.on_tick_finished(actor_types, all_queues) + self.on_tick_finished(all_queues, all_existing_actors, all_spawned_actors) + time.sleep(1) - except (KeyboardInterrupt, SystemExit) as err: - self.on_shutdown(err) + except BaseException as err: + if isinstance(err, KeyboardInterrupt): + print() + else: + print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) + self.on_shutdown(err=err) @@ -114,94 +142,82 @@ from django.utils import timezone from django import db from django.db import connection -def get_next_archiveresult_atomically() -> ArchiveResult | None: - with connection.cursor() as cursor: - # select a random archiveresult out of the next 50 pending ones - # (to avoid clashing with another actor thats also selecting from the same list) - cursor.execute(""" - UPDATE core_archiveresult - SET status = 'started' - WHERE status = 'failed' and id = ( - SELECT id FROM ( - SELECT id FROM core_archiveresult - WHERE status = 'failed' - ORDER BY start_ts DESC - LIMIT 50 - ) candidates - ORDER BY RANDOM() - LIMIT 1 - ) - RETURNING *; - """) - result = cursor.fetchone() - - # If no rows were updated, return None - if result is None: - return None - - # Convert the row tuple into a dict matching column names - columns = [col[0] for col in cursor.description] - return ArchiveResult(**dict(zip(columns, result))) -class TestActor(ActorType[ArchiveResult]): + +class FaviconActor(ActorType[ArchiveResult]): @classmethod def get_queue(cls) -> QuerySet[ArchiveResult]: return ArchiveResult.objects.filter(status='failed', extractor='favicon') @classmethod def get_next(cls) -> ArchiveResult | None: - return get_next_archiveresult_atomically() - # return cls.get_queue().last() + return cls.get_next_atomic( + model=ArchiveResult, + filter=('status', 'failed'), + update=('status', 'started'), + sort='created_at', + order='DESC', + choose_from_top=cpu_count() * 10 + ) def tick(self, obj: ArchiveResult): - # print(f'TestActor[{self.pid}] tick({obj.id})', 'remaining:', self.get_queue().count()) - updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') + print(f'[grey53]{self}.tick({obj.id}) remaining:[/grey53]', self.get_queue().count()) + updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 if not updated: - raise Exception('Failed to update object status, likely being processed by another actor') + raise Exception(f'Failed to update {obj.abid}, interrupted by another actor writing to the same object') def lock(self, obj: ArchiveResult) -> bool: - locked = True + """As an alternative to self.get_next_atomic(), we can use select_for_update() or manually update a semaphore field here""" + # locked = ArchiveResult.objects.select_for_update(skip_locked=True).filter(id=obj.id, status='pending').update(status='started') == 1 # if locked: - # print(f'TestActor[{self.pid}] lock({obj.id}) 🔒') + # print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒') # else: - # print(f'TestActor[{self.pid}] lock({obj.id}) X') - return locked - + # print(f'FaviconActor[{self.pid}] lock({obj.id}) X') + return True + + +class ExtractorsOrchestrator(Orchestrator): + actor_types = { + 'FaviconActor': FaviconActor, + } + + if __name__ == '__main__': + orchestrator = ExtractorsOrchestrator() + orchestrator.start() + snap = Snapshot.objects.last() assert snap is not None - - orchestrator = Orchestrator() - orchestrator.spawn_orchestrator() - - for _ in range(50_000): + created = 0 + while True: + time.sleep(0.005) try: - ar = ArchiveResult.objects.create( - snapshot=snap, - status='failed', - extractor='favicon', - cmd=['echo', '"hello"'], - cmd_version='1.0', - pwd='.', - start_ts=timezone.now(), - end_ts=timezone.now(), - ) + ArchiveResult.objects.bulk_create([ + ArchiveResult( + id=uuid.uuid4(), + snapshot=snap, + status='failed', + extractor='favicon', + cmd=['echo', '"hello"'], + cmd_version='1.0', + pwd='.', + start_ts=timezone.now(), + end_ts=timezone.now(), + created_at=timezone.now(), + modified_at=timezone.now(), + created_by_id=1, + ) + for _ in range(100) + ]) + created += 100 + if created % 1000 == 0: + print(f'[blue]Created {created} ArchiveResults...[/blue]') + time.sleep(25) except Exception as err: print(err) db.connections.close_all() - if _ % 1000 == 0: - print('Created', _, 'snapshots...') - time.sleep(0.001) - # time.sleep(3) - - # test_queue = TestActor.get_queue() - # thread_actors = [] - # print('Actor queue:', test_queue) - # actors_to_spawn = TestActor.get_actors_to_spawn(test_queue, thread_actors) - # print('Actors to spawn:', actors_to_spawn) - # # thread_actors = [TestActor.spawn_actor(mode='thread') for _ in actors_to_spawn] - # # print('Thread Actors spawned:', thread_actors) - # process_actors = [TestActor.spawn_actor(mode='process') for _ in actors_to_spawn] - # print('Process Actors spawned:', process_actors) + except BaseException as err: + print(err) + break