diff --git a/archivebox/builtin_plugins/chrome/apps.py b/archivebox/builtin_plugins/chrome/apps.py index 1153d9aa..f69967b5 100644 --- a/archivebox/builtin_plugins/chrome/apps.py +++ b/archivebox/builtin_plugins/chrome/apps.py @@ -86,23 +86,23 @@ class ChromeDependencyConfigs(BaseConfigSet): CHROME_EXTRA_ARGS: List[str] = [] CHROME_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}'] - def load(self) -> Self: - # for each field in the model, load its value - # load from each source in order of precedence (lowest to highest): - # - schema default - # - ArchiveBox.conf INI file - # - environment variables - # - command-line arguments + # def load(self) -> Self: + # # for each field in the model, load its value + # # load from each source in order of precedence (lowest to highest): + # # - schema default + # # - ArchiveBox.conf INI file + # # - environment variables + # # - command-line arguments - LOADED_VALUES: Dict[str, Any] = {} + # LOADED_VALUES: Dict[str, Any] = {} - for field_name, field in self.__fields__.items(): - def_value = field.default_factory() if field.default_factory else field.default - ini_value = settings.INI_CONFIG.get_value(field_name) - env_value = settings.ENV_CONFIG.get_value(field_name) - cli_value = settings.CLI_CONFIG.get_value(field_name) - run_value = settings.RUN_CONFIG.get_value(field_name) - value = run_value or cli_value or env_value or ini_value or def_value + # for field_name, field in self.__fields__.items(): + # def_value = field.default_factory() if field.default_factory else field.default + # ini_value = settings.INI_CONFIG.get_value(field_name) + # env_value = settings.ENV_CONFIG.get_value(field_name) + # cli_value = settings.CLI_CONFIG.get_value(field_name) + # run_value = settings.RUN_CONFIG.get_value(field_name) + # value = run_value or cli_value or env_value or ini_value or def_value class ChromeConfigs(ChromeDependencyConfigs): # section: ConfigSectionName = 'ALL_CONFIGS' diff --git a/archivebox/plugantic/base_queue.py b/archivebox/plugantic/base_queue.py index a3409b6e..7e2b06c6 100644 --- a/archivebox/plugantic/base_queue.py +++ b/archivebox/plugantic/base_queue.py @@ -36,6 +36,7 @@ class BaseQueue(BaseHook): return AttrDict(all_tasks) def get_huey_config(self, settings) -> dict: + """Get the config dict to insert into django.conf.settings.DJANGO_HUEY['queues'].""" return { "huey_class": "huey.SqliteHuey", "filename": settings.QUEUE_DATABASE_NAME, @@ -58,6 +59,7 @@ class BaseQueue(BaseHook): } def get_supervisor_config(self, settings) -> dict: + """Ge the config dict used to tell sueprvisord to start a huey consumer for this queue.""" return { "name": f"worker_{self.name}", "command": f"archivebox manage djangohuey --queue {self.name}", diff --git a/archivebox/plugantic/ini_to_toml.py b/archivebox/plugantic/ini_to_toml.py index eec21f74..9f45013f 100644 --- a/archivebox/plugantic/ini_to_toml.py +++ b/archivebox/plugantic/ini_to_toml.py @@ -63,7 +63,7 @@ def convert(ini_str: str) -> str: ### Basic Assertions -test_input = """ +test_input = r""" [SERVER_CONFIG] IS_TTY=False USE_COLOR=False @@ -225,7 +225,7 @@ NODE_VERSION=v21.7.3 """ -expected_output = '''[SERVER_CONFIG] +expected_output = r'''[SERVER_CONFIG] IS_TTY = false USE_COLOR = false SHOW_PROGRESS = false diff --git a/archivebox/queues/semaphores.py b/archivebox/queues/semaphores.py new file mode 100644 index 00000000..e798e59c --- /dev/null +++ b/archivebox/queues/semaphores.py @@ -0,0 +1,101 @@ +import time +import uuid +from functools import wraps +from django.db import connection, transaction +from django.utils import timezone +from huey.exceptions import TaskLockedException + +class SqliteSemaphore: + def __init__(self, db_path, table_name, name, value=1, timeout=None): + self.db_path = db_path + self.table_name = table_name + self.name = name + self.value = value + self.timeout = timeout or 86400 # Set a max age for lock holders + + # Ensure the table exists + with connection.cursor() as cursor: + cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {self.table_name} ( + id TEXT PRIMARY KEY, + name TEXT, + timestamp DATETIME + ) + """) + + def acquire(self, name=None): + name = name or str(uuid.uuid4()) + now = timezone.now() + expiration = now - timezone.timedelta(seconds=self.timeout) + + with transaction.atomic(): + # Remove expired locks + with connection.cursor() as cursor: + cursor.execute(f""" + DELETE FROM {self.table_name} + WHERE name = %s AND timestamp < %s + """, [self.name, expiration]) + + # Try to acquire the lock + with connection.cursor() as cursor: + cursor.execute(f""" + INSERT INTO {self.table_name} (id, name, timestamp) + SELECT %s, %s, %s + WHERE ( + SELECT COUNT(*) FROM {self.table_name} + WHERE name = %s + ) < %s + """, [name, self.name, now, self.name, self.value]) + + if cursor.rowcount > 0: + return name + + # If we couldn't acquire the lock, remove our attempted entry + with connection.cursor() as cursor: + cursor.execute(f""" + DELETE FROM {self.table_name} + WHERE id = %s AND name = %s + """, [name, self.name]) + + return None + + def release(self, name): + with connection.cursor() as cursor: + cursor.execute(f""" + DELETE FROM {self.table_name} + WHERE id = %s AND name = %s + """, [name, self.name]) + return cursor.rowcount > 0 + + +LOCKS_DB_PATH = settings.CONFIG.OUTPUT_DIR / 'locks.sqlite3' + +def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None): + """ + Lock which can be acquired multiple times (default = 1). + + NOTE: no provisions are made for blocking, waiting, or notifying. This is + just a lock which can be acquired a configurable number of times. + + Example: + + # Allow up to 3 workers to run this task concurrently. If the task is + # locked, retry up to 2 times with a delay of 60s. + @huey.task(retries=2, retry_delay=60) + @lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3) + def my_task(): + ... + """ + sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout) + def decorator(fn): + @wraps(fn) + def inner(*args, **kwargs): + tid = sem.acquire() + if tid is None: + raise TaskLockedException(f'unable to acquire lock {lock_name}') + try: + return fn(*args, **kwargs) + finally: + sem.release(tid) + return inner + return decorator