diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index 97316405..98fdd4cb 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -2,10 +2,11 @@ __package__ = 'archivebox.actors' import os import time -import psutil from typing import ClassVar, Generic, TypeVar, Any, cast, Literal, Type +from django.utils.functional import classproperty from rich import print +import psutil from django import db from django.db import models @@ -37,11 +38,15 @@ class ActorType(Generic[ModelType]): def __repr__(self) -> str: label = 'pid' if self.mode == 'process' else 'tid' - return f'[underline]{self.__class__.__name__}[/underline]\\[{label}={self.pid}]' + return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]' def __str__(self) -> str: return self.__repr__() + @classproperty + def name(cls) -> str: + return cls.__name__ # type: ignore + @classmethod def get_running_actors(cls) -> list[int]: """returns a list of pids of all running actors of this type""" @@ -89,7 +94,35 @@ class ActorType(Generic[ModelType]): return cls.get_queue().last() @classmethod - def get_next_atomic(cls, model: Type, filter=('status', 'queued'), update=('status', 'started'), sort='created_at', order='DESC', choose_from_top=50) -> ModelType | None: + def get_random(cls, model: Type[ModelType], where='status = "queued"', set='status = "started"', choose_from_top=50) -> ModelType | None: + app_label = model._meta.app_label + model_name = model._meta.model_name + + with db.connection.cursor() as cursor: + # subquery gets the pool of the top 50 candidates sorted by sort and order + # main query selects a random one from that pool + cursor.execute(f""" + UPDATE {app_label}_{model_name} + SET {set} + WHERE {where} and id = ( + SELECT id FROM {app_label}_{model_name} + WHERE {where} + LIMIT 1 + OFFSET ABS(RANDOM()) % {choose_from_top} + ) + RETURNING id; + """) + result = cursor.fetchone() + + # If no rows were claimed, return None + if result is None: + return None + + return model.objects.get(id=result[0]) + + + @classmethod + def get_next_atomic(cls, model: Type[ModelType], where='status = "queued"', set='status = "started"', order_by='created_at DESC', choose_from_top=50) -> ModelType | None: """ atomically claim a random object from the top n=50 objects in the queue by updating status=queued->started optimized for minimizing contention on the queue with other actors selecting from the same list @@ -102,18 +135,18 @@ class ActorType(Generic[ModelType]): # main query selects a random one from that pool cursor.execute(f""" UPDATE {app_label}_{model_name} - SET {update[0]} = '{update[1]}' - WHERE {filter[0]} = '{filter[1]}' and id = ( + SET {set} + WHERE {where} and id = ( SELECT id FROM ( SELECT id FROM {app_label}_{model_name} - WHERE {filter[0]} = '{filter[1]}' - ORDER BY {sort} {order} + WHERE {where} + ORDER BY {order_by} LIMIT {choose_from_top} ) candidates ORDER BY RANDOM() LIMIT 1 ) - RETURNING *; + RETURNING id; """) result = cursor.fetchone() @@ -121,9 +154,7 @@ class ActorType(Generic[ModelType]): if result is None: return None - # reconstruct model instance from the row tuple - columns = [col[0] for col in cursor.description] - return model(**dict(zip(columns, result))) + return model.objects.get(id=result[0]) @classmethod def get_actors_to_spawn(cls, queue, running_actors) -> list[LaunchKwargs]: @@ -159,19 +190,19 @@ class ActorType(Generic[ModelType]): # abx.pm.hook.on_actor_shutdown(self) def on_tick_start(self, obj: ModelType): - # print(f'🏃‍♂️ {self}.on_tick_start()', getattr(obj, 'abid', obj.id)) + # print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id) # abx.pm.hook.on_actor_tick_start(self, obj_to_process) # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') pass def on_tick_end(self, obj: ModelType): - # print(f'🏃‍♂️ {self}.on_tick_end()', getattr(obj, 'abid', obj.id)) + # print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id) # abx.pm.hook.on_actor_tick_end(self, obj_to_process) # self.timer.end() pass def on_tick_exception(self, obj: ModelType, err: BaseException): - print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', getattr(obj, 'abid', obj.id), err) + print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err) # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) def runloop(self): @@ -220,10 +251,10 @@ class ActorType(Generic[ModelType]): self.on_shutdown(err=err) def tick(self, obj: ModelType) -> None: - print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', getattr(obj, 'abid', obj.id)) + print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id) def lock(self, obj: ModelType) -> bool: - print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', getattr(obj, 'abid', obj.id)) + print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', obj.abid or obj.id) return True diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index 1ca90148..ff33ec3e 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -4,9 +4,12 @@ import os import time import itertools import uuid -from typing import Dict, Type +from typing import Dict, Type, Literal +from django.utils.functional import classproperty from multiprocessing import Process, cpu_count +from threading import Thread, get_native_id + from rich import print @@ -19,21 +22,41 @@ class Orchestrator: pid: int idle_count: int = 0 actor_types: Dict[str, Type[ActorType]] + mode: Literal['thread', 'process'] = 'process' - def __init__(self, actor_types: Dict[str, Type[ActorType]] | None = None): + def __init__(self, actor_types: Dict[str, Type[ActorType]] | None = None, mode: Literal['thread', 'process'] | None=None): self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types() + self.mode = mode or self.mode def __repr__(self) -> str: - return f'[underline]{self.__class__.__name__}[/underline]\\[pid={self.pid}]' + label = 'tid' if self.mode == 'thread' else 'pid' + return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]' def __str__(self) -> str: return self.__repr__() + + @classproperty + def name(cls) -> str: + return cls.__name__ # type: ignore + + def fork_as_thread(self): + self.thread = Thread(target=self.runloop) + self.thread.start() + assert self.thread.native_id is not None + return self.thread.native_id + + def fork_as_process(self): + self.process = Process(target=self.runloop) + self.process.start() + assert self.process.pid is not None + return self.process.pid def start(self) -> int: - orchestrator_bg_proc = Process(target=self.runloop) - orchestrator_bg_proc.start() - assert orchestrator_bg_proc.pid is not None - return orchestrator_bg_proc.pid + if self.mode == 'thread': + return self.fork_as_thread() + elif self.mode == 'process': + return self.fork_as_process() + raise ValueError(f'Invalid orchestrator mode: {self.mode}') @classmethod def autodiscover_actor_types(cls) -> Dict[str, Type[ActorType]]: @@ -42,7 +65,8 @@ class Orchestrator: # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} return { # look through all models and find all classes that inherit from ActorType - # ... + # actor_type.__name__: actor_type + # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values() } @classmethod @@ -56,8 +80,12 @@ class Orchestrator: return orphaned_objects def on_startup(self): - self.pid = os.getpid() - print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]') + if self.mode == 'thread': + self.pid = get_native_id() + print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') + elif self.mode == 'process': + self.pid = os.getpid() + print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]') # abx.pm.hook.on_orchestrator_startup(self) def on_shutdown(self, err: BaseException | None = None): @@ -109,8 +137,10 @@ class Orchestrator: for launch_kwargs in actors_to_spawn: new_actor_pid = actor_type.start(mode='process', **launch_kwargs) all_spawned_actors.append(new_actor_pid) - except BaseException as err: + except Exception as err: print(f'🏃‍♂️ ERROR: {self} Failed to get {actor_type} queue & running actors', err) + except BaseException: + raise if not any(queue.exists() for queue in all_queues.values()): self.on_idle(all_queues) @@ -152,30 +182,36 @@ class FaviconActor(ActorType[ArchiveResult]): @classmethod def get_next(cls) -> ArchiveResult | None: - return cls.get_next_atomic( + # return cls.get_next_atomic( + # model=ArchiveResult, + # where='status = "failed"', + # set='status = "started"', + # order_by='created_at DESC', + # choose_from_top=cpu_count() * 10, + # ) + return cls.get_random( model=ArchiveResult, - filter=('status', 'failed'), - update=('status', 'started'), - sort='created_at', - order='DESC', - choose_from_top=cpu_count() * 10 + where='status = "failed"', + set='status = "queued"', + choose_from_top=cls.get_queue().count(), ) def tick(self, obj: ArchiveResult): - print(f'[grey53]{self}.tick({obj.id}) remaining:[/grey53]', self.get_queue().count()) + print(f'[grey53]{self}.tick({obj.abid or obj.id}) remaining:[/grey53]', self.get_queue().count()) updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 if not updated: - raise Exception(f'Failed to update {obj.abid}, interrupted by another actor writing to the same object') + raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') def lock(self, obj: ArchiveResult) -> bool: """As an alternative to self.get_next_atomic(), we can use select_for_update() or manually update a semaphore field here""" - # locked = ArchiveResult.objects.select_for_update(skip_locked=True).filter(id=obj.id, status='pending').update(status='started') == 1 - # if locked: - # print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒') - # else: - # print(f'FaviconActor[{self.pid}] lock({obj.id}) X') - return True + locked = ArchiveResult.objects.filter(id=obj.id, status='queued').update(status='started') == 1 + if locked: + # print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒') + pass + else: + print(f'FaviconActor[{self.pid}] lock({obj.id}) X') + return locked class ExtractorsOrchestrator(Orchestrator): @@ -192,32 +228,32 @@ if __name__ == '__main__': assert snap is not None created = 0 while True: - time.sleep(0.005) - try: - ArchiveResult.objects.bulk_create([ - ArchiveResult( - id=uuid.uuid4(), - snapshot=snap, - status='failed', - extractor='favicon', - cmd=['echo', '"hello"'], - cmd_version='1.0', - pwd='.', - start_ts=timezone.now(), - end_ts=timezone.now(), - created_at=timezone.now(), - modified_at=timezone.now(), - created_by_id=1, - ) - for _ in range(100) - ]) - created += 100 - if created % 1000 == 0: - print(f'[blue]Created {created} ArchiveResults...[/blue]') - time.sleep(25) - except Exception as err: - print(err) - db.connections.close_all() - except BaseException as err: - print(err) - break + time.sleep(0.05) + # try: + # ArchiveResult.objects.bulk_create([ + # ArchiveResult( + # id=uuid.uuid4(), + # snapshot=snap, + # status='failed', + # extractor='favicon', + # cmd=['echo', '"hello"'], + # cmd_version='1.0', + # pwd='.', + # start_ts=timezone.now(), + # end_ts=timezone.now(), + # created_at=timezone.now(), + # modified_at=timezone.now(), + # created_by_id=1, + # ) + # for _ in range(100) + # ]) + # created += 100 + # if created % 1000 == 0: + # print(f'[blue]Created {created} ArchiveResults...[/blue]') + # time.sleep(25) + # except Exception as err: + # print(err) + # db.connections.close_all() + # except BaseException as err: + # print(err) + # break