diff --git a/archivebox/builtin_plugins/singlefile/apps.py b/archivebox/builtin_plugins/singlefile/apps.py index 2eb0de05..e226cc66 100644 --- a/archivebox/builtin_plugins/singlefile/apps.py +++ b/archivebox/builtin_plugins/singlefile/apps.py @@ -12,6 +12,7 @@ from plugantic.base_plugin import BasePlugin from plugantic.base_configset import BaseConfigSet, ConfigSectionName from plugantic.base_binary import BaseBinary, env from plugantic.base_extractor import BaseExtractor +from plugantic.base_queue import BaseQueue from plugantic.base_hook import BaseHook # Depends on Other Plugins: @@ -95,6 +96,13 @@ class SinglefileExtractor(BaseExtractor): SINGLEFILE_BINARY = SinglefileBinary() SINGLEFILE_EXTRACTOR = SinglefileExtractor() +class SinglefileQueue(BaseQueue): + name: str = 'singlefile' + + binaries: List[InstanceOf[BaseBinary]] = [SINGLEFILE_BINARY] + +SINGLEFILE_QUEUE = SinglefileQueue() + class SinglefilePlugin(BasePlugin): app_label: str ='singlefile' verbose_name: str = 'SingleFile' diff --git a/archivebox/builtin_plugins/singlefile/tasks.py b/archivebox/builtin_plugins/singlefile/tasks.py new file mode 100644 index 00000000..8ab2bd95 --- /dev/null +++ b/archivebox/builtin_plugins/singlefile/tasks.py @@ -0,0 +1,40 @@ +__package__ = 'archivebox.queues' + +import time + +from django.core.cache import cache + +from huey import crontab +from django_huey import db_task, on_startup, db_periodic_task +from huey_monitor.models import TaskModel +from huey_monitor.tqdm import ProcessInfo + +@db_task(queue="singlefile", context=True) +def extract(url, out_dir, config, task=None, parent_task_id=None): + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + process_info = ProcessInfo(task, desc="extract_singlefile", parent_task_id=parent_task_id, total=1) + + time.sleep(5) + + process_info.update(n=1) + return {'output': 'singlefile.html', 'status': 'succeeded'} + + +# @on_startup(queue='singlefile') +# def start_singlefile_queue(): +# print("[+] Starting singlefile worker...") +# update_version.call_local() + + +# @db_periodic_task(crontab(minute='*/5'), queue='singlefile') +# def update_version(): +# print('[*] Updating singlefile version... 5 minute interval') +# from django.conf import settings + +# bin = settings.BINARIES.SinglefileBinary.load() +# if bin.version: +# cache.set(f"bin:abspath:{bin.name}", bin.abspath) +# cache.set(f"bin:version:{bin.name}:{bin.abspath}", bin.version) +# print('[√] Updated singlefile version:', bin.version, bin.abspath) diff --git a/archivebox/cli/__init__.py b/archivebox/cli/__init__.py index 6a0106a0..2b59dcba 100644 --- a/archivebox/cli/__init__.py +++ b/archivebox/cli/__init__.py @@ -37,7 +37,7 @@ is_valid_cli_module = lambda module, subcommand: ( ) -IGNORED_BG_THREADS = ('MainThread', 'ThreadPoolExecutor', 'IPythonHistorySavingThread') # threads we dont have to wait for before exiting +IGNORED_BG_THREADS = ('MainThread', 'ThreadPoolExecutor', 'IPythonHistorySavingThread', 'Scheduler') # threads we dont have to wait for before exiting def wait_for_bg_threads_to_exit(thread_names: Iterable[str]=(), ignore_names: Iterable[str]=IGNORED_BG_THREADS, timeout: int=60) -> int: diff --git a/archivebox/core/admin.py b/archivebox/core/admin.py index df625e89..ccaa675b 100644 --- a/archivebox/core/admin.py +++ b/archivebox/core/admin.py @@ -30,6 +30,7 @@ from core.models import Snapshot, ArchiveResult, Tag from core.mixins import SearchResultsAdminMixin from api.models import APIToken from abid_utils.admin import ABIDModelAdmin +from queues.tasks import bg_archive_links, bg_add from index.html import snapshot_icons from logging_util import printable_filesize @@ -137,6 +138,8 @@ class CustomUserAdmin(UserAdmin): ) + f'
{total_count} total records...') + + archivebox_admin = ArchiveBoxAdmin() archivebox_admin.register(get_user_model(), CustomUserAdmin) archivebox_admin.disable_action('delete_selected') @@ -155,6 +158,28 @@ archivebox_admin.get_admin_data_urls = get_admin_data_urls.__get__(archivebox_ad archivebox_admin.get_urls = get_urls(archivebox_admin.get_urls).__get__(archivebox_admin, ArchiveBoxAdmin) +from huey_monitor.apps import HueyMonitorConfig +HueyMonitorConfig.verbose_name = 'Background Workers' + +from huey_monitor.admin import TaskModel, TaskModelAdmin, SignalInfoModel, SignalInfoModelAdmin +archivebox_admin.register(SignalInfoModel, SignalInfoModelAdmin) + + +class CustomTaskModelAdmin(TaskModelAdmin): + actions = ["delete_selected"] + + def has_delete_permission(self, request, obj=None): + codename = get_permission_codename("delete", self.opts) + return request.user.has_perm("%s.%s" % (self.opts.app_label, codename)) + + +archivebox_admin.register(TaskModel, CustomTaskModelAdmin) + +def result_url(result: TaskModel) -> str: + url = reverse("admin:huey_monitor_taskmodel_change", args=[str(result.id)]) + return format_html('See progress...'.format(url=url)) + + class AccelleratedPaginator(Paginator): """ Accellerated Pagniator ignores DISTINCT when counting total number of rows. @@ -515,65 +540,53 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin): archive_links(links, overwrite=True, methods=('title','favicon'), out_dir=CONFIG.OUTPUT_DIR) messages.success(request, f"Title and favicon have been fetched and saved for {len(links)} URLs.") else: - # otherwise run in a bg thread - bg_thread = threading.Thread( - target=archive_links, - args=(links,), - kwargs={"overwrite": True, "methods": ['title', 'favicon'], "out_dir": CONFIG.OUTPUT_DIR}, + # otherwise run in a background worker + result = bg_archive_links((links,), kwargs={"overwrite": True, "methods": ["title", "favicon"], "out_dir": CONFIG.OUTPUT_DIR}) + messages.success( + request, + mark_safe(f"Title and favicon are updating in the background for {len(links)} URLs. {result_url(result)}"), ) - bg_thread.setDaemon(True) - bg_thread.start() - messages.success(request, f"Title and favicon are updating in the background for {len(links)} URLs. (refresh in a few minutes to see results)") @admin.action( description="⬇️ Get Missing" ) def update_snapshots(self, request, queryset): links = [snapshot.as_link() for snapshot in queryset] - bg_thread = threading.Thread( - target=archive_links, - args=(links,), - kwargs={"overwrite": False, "out_dir": CONFIG.OUTPUT_DIR}, - ) - bg_thread.setDaemon(True) - bg_thread.start() + + result = bg_archive_links((links,), kwargs={"overwrite": False, "out_dir": CONFIG.OUTPUT_DIR}) + messages.success( - request, f"Re-trying any previously failed methods for {len(links)} URLs in the background. (refresh in a few minutes to see results)" + request, + mark_safe(f"Re-trying any previously failed methods for {len(links)} URLs in the background. {result_url(result)}"), ) @admin.action( - description="πŸ“‘ Archive again" + description="πŸ†• Archive Again" ) def resnapshot_snapshot(self, request, queryset): for snapshot in queryset: timestamp = timezone.now().isoformat('T', 'seconds') new_url = snapshot.url.split('#')[0] + f'#{timestamp}' - bg_thread = threading.Thread(target=add, args=(new_url,), kwargs={'tag': snapshot.tags_str()}) - bg_thread.setDaemon(True) - bg_thread.start() + result = bg_add({'urls': new_url, 'tag': snapshot.tags_str()}) messages.success( request, - f"Creating new fresh snapshots for {len(queryset.count())} URLs in the background. (refresh in a few minutes to see results)", + mark_safe(f"Creating new fresh snapshots for {queryset.count()} URLs in the background. {result_url(result)}"), ) @admin.action( - description="β™² Redo" + description="πŸ”„ Redo" ) def overwrite_snapshots(self, request, queryset): links = [snapshot.as_link() for snapshot in queryset] - bg_thread = threading.Thread( - target=archive_links, - args=(links,), - kwargs={"overwrite": True, "out_dir": CONFIG.OUTPUT_DIR}, - ) - bg_thread.setDaemon(True) - bg_thread.start() + + result = bg_archive_links((links,), kwargs={"overwrite": True, "out_dir": CONFIG.OUTPUT_DIR}) + messages.success( request, - f"Clearing all previous results and re-downloading {len(links)} URLs in the background. (refresh in a few minutes to see results)", + mark_safe(f"Clearing all previous results and re-downloading {len(links)} URLs in the background. {result_url(result)}"), ) @admin.action( @@ -583,7 +596,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin): remove(snapshots=queryset, yes=True, delete=True, out_dir=CONFIG.OUTPUT_DIR) messages.success( request, - f"Succesfully deleted {len(queryset.count())} Snapshots. Don't forget to scrub URLs from import logs (data/sources) and error logs (data/logs) if needed.", + mark_safe(f"Succesfully deleted {queryset.count()} Snapshots. Don't forget to scrub URLs from import logs (data/sources) and error logs (data/logs) if needed."), ) @@ -597,7 +610,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin): obj.tags.add(*tags) messages.success( request, - f"Added {len(tags)} tags to {len(queryset.count())} Snapshots.", + f"Added {len(tags)} tags to {queryset.count()} Snapshots.", ) @@ -611,7 +624,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin): obj.tags.remove(*tags) messages.success( request, - f"Removed {len(tags)} tags from {len(queryset.count())} Snapshots.", + f"Removed {len(tags)} tags from {queryset.count()} Snapshots.", ) @@ -727,7 +740,6 @@ class ArchiveResultAdmin(ABIDModelAdmin): else: root_dir = str(snapshot_dir) - # print(root_dir, str(list(os.walk(root_dir)))) for root, dirs, files in os.walk(root_dir): diff --git a/archivebox/core/settings.py b/archivebox/core/settings.py index f988673d..c434f8e6 100644 --- a/archivebox/core/settings.py +++ b/archivebox/core/settings.py @@ -87,6 +87,7 @@ INSTALLED_APPS = [ 'django_object_actions', # provides easy Django Admin action buttons on change views https://github.com/crccheck/django-object-actions # Our ArchiveBox-provided apps + 'queues', # handles starting and managing background workers and processes 'abid_utils', # handles ABID ID creation, handling, and models 'plugantic', # ArchiveBox plugin API definition + finding/registering/calling interface 'core', # core django model with Snapshot, ArchiveResult, etc. @@ -98,6 +99,9 @@ INSTALLED_APPS = [ # 3rd-party apps from PyPI that need to be loaded last 'admin_data_views', # handles rendering some convenient automatic read-only views of data in Django admin 'django_extensions', # provides Django Debug Toolbar (and other non-debug helpers) + 'django_huey', # provides multi-queue support for django huey https://github.com/gaiacoop/django-huey + 'bx_django_utils', # needed for huey_monitor https://github.com/boxine/bx_django_utils + 'huey_monitor', # adds an admin UI for monitoring background huey tasks https://github.com/boxine/django-huey-monitor ] @@ -212,17 +216,28 @@ CACHE_DB_TABLE = 'django_cache' DATABASE_FILE = Path(CONFIG.OUTPUT_DIR) / CONFIG.SQL_INDEX_FILENAME DATABASE_NAME = os.environ.get("ARCHIVEBOX_DATABASE_NAME", str(DATABASE_FILE)) +QUEUE_DATABASE_NAME = DATABASE_NAME.replace('index.sqlite3', 'queue.sqlite3') + DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': DATABASE_NAME, - 'OPTIONS': { - 'timeout': 60, - 'check_same_thread': False, + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": DATABASE_NAME, + "OPTIONS": { + "timeout": 60, + "check_same_thread": False, }, - 'TIME_ZONE': CONFIG.TIMEZONE, + "TIME_ZONE": CONFIG.TIMEZONE, # DB setup is sometimes modified at runtime by setup_django() in config.py }, + "queue": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": QUEUE_DATABASE_NAME, + "OPTIONS": { + "timeout": 60, + "check_same_thread": False, + }, + "TIME_ZONE": CONFIG.TIMEZONE, + }, # 'cache': { # 'ENGINE': 'django.db.backends.sqlite3', # 'NAME': CACHE_DB_PATH, @@ -239,6 +254,64 @@ MIGRATION_MODULES = {'signal_webhooks': None} DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' +HUEY = { + "huey_class": "huey.SqliteHuey", + "filename": QUEUE_DATABASE_NAME, + "name": "system_tasks", + "results": True, + "store_none": True, + "immediate": False, + "utc": True, + "consumer": { + "workers": 1, + "worker_type": "thread", + "initial_delay": 0.1, # Smallest polling interval, same as -d. + "backoff": 1.15, # Exponential backoff using this rate, -b. + "max_delay": 10.0, # Max possible polling interval, -m. + "scheduler_interval": 1, # Check schedule every second, -s. + "periodic": True, # Enable crontab feature. + "check_worker_health": True, # Enable worker health checks. + "health_check_interval": 1, # Check worker health every second. + }, +} + +# https://huey.readthedocs.io/en/latest/contrib.html#setting-things-up +# https://github.com/gaiacoop/django-huey +DJANGO_HUEY = { + "default": "system_tasks", + "queues": { + HUEY["name"]: HUEY.copy(), + # more registered here at plugin import-time by BaseQueue.register() + }, +} + +class HueyDBRouter: + """A router to store all the Huey Monitor models in the queue.sqlite3 database.""" + + route_app_labels = {"huey_monitor", "django_huey", "djhuey"} + + def db_for_read(self, model, **hints): + if model._meta.app_label in self.route_app_labels: + return "queue" + return 'default' + + def db_for_write(self, model, **hints): + if model._meta.app_label in self.route_app_labels: + return "queue" + return 'default' + + def allow_relation(self, obj1, obj2, **hints): + if obj1._meta.app_label in self.route_app_labels or obj2._meta.app_label in self.route_app_labels: + return obj1._meta.app_label == obj2._meta.app_label + return None + + def allow_migrate(self, db, app_label, model_name=None, **hints): + if app_label in self.route_app_labels: + return db == "queue" + return db == "default" + +DATABASE_ROUTERS = ['core.settings.HueyDBRouter'] + CACHES = { 'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}, # 'sqlite': {'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'cache'}, diff --git a/archivebox/core/views.py b/archivebox/core/views.py index 260a6f70..0964696e 100644 --- a/archivebox/core/views.py +++ b/archivebox/core/views.py @@ -23,6 +23,9 @@ from admin_data_views.utils import render_with_table_view, render_with_item_view from core.models import Snapshot from core.forms import AddLinkForm +from core.admin import result_url + +from queues.tasks import bg_add from ..config import ( OUTPUT_DIR, @@ -478,15 +481,14 @@ class AddView(UserPassesTestMixin, FormView): if extractors: input_kwargs.update({"extractors": extractors}) - bg_thread = threading.Thread(target=add, kwargs=input_kwargs) - bg_thread.setDaemon(True) - bg_thread.start() + result = bg_add(input_kwargs, parent_task_id=None) + print('Started background add job:', result) rough_url_count = url.count('://') messages.success( self.request, - f"Adding {rough_url_count} URLs in the background. (refresh in a few minutes to see results)", + mark_safe(f"Adding {rough_url_count} URLs in the background. (refresh in a few minutes to see results) {result_url(result)}"), ) return redirect("/admin/core/snapshot/") diff --git a/archivebox/index/sql.py b/archivebox/index/sql.py index 97058590..0071f60b 100644 --- a/archivebox/index/sql.py +++ b/archivebox/index/sql.py @@ -148,17 +148,16 @@ def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]: @enforce_types def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]: from django.core.management import call_command - null, out = StringIO(), StringIO() - try: - call_command("makemigrations", interactive=False, stdout=null) - except Exception as e: - print('[!] Failed to create some migrations. Please open an issue and copy paste this output for help: {}'.format(e)) - print() + out1, out2 = StringIO(), StringIO() - call_command("migrate", interactive=False, stdout=out) - out.seek(0) + call_command("migrate", interactive=False, database='default', stdout=out1) + out1.seek(0) + call_command("migrate", "huey_monitor", interactive=False, database='queue', stdout=out2) + out2.seek(0) - return [line.strip() for line in out.readlines() if line.strip()] + return [ + line.strip() for line in out1.readlines() + out2.readlines() if line.strip() + ] @enforce_types def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]: diff --git a/archivebox/main.py b/archivebox/main.py index ce553bbf..bf21bdb3 100755 --- a/archivebox/main.py +++ b/archivebox/main.py @@ -1,8 +1,10 @@ __package__ = 'archivebox' import os +import time import sys import shutil +import signal import platform import subprocess @@ -1352,6 +1354,7 @@ def server(runserver_args: Optional[List[str]]=None, if reload or debug: call_command("runserver", *runserver_args) else: + host = '127.0.0.1' port = '8000' @@ -1367,12 +1370,52 @@ def server(runserver_args: Optional[List[str]]=None, except IndexError: pass + from queues.supervisor_util import get_or_create_supervisord_process, start_worker, stop_worker, watch_worker + + print() + supervisor = get_or_create_supervisord_process(daemonize=False) + + bg_workers = [ + { + "name": "worker_system_tasks", + "command": "archivebox manage djangohuey --queue system_tasks", + "autostart": "true", + "autorestart": "true", + "stdout_logfile": "logs/worker_system_tasks.log", + "redirect_stderr": "true", + }, + ] + fg_worker = { + "name": "worker_daphne", + "command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application", + "autostart": "false", + "autorestart": "true", + "stdout_logfile": "logs/worker_daphne.log", + "redirect_stderr": "true", + } + + print() + for worker in bg_workers: + start_worker(supervisor, worker) + + print() + start_worker(supervisor, fg_worker) + print() + try: - subprocess.run(['daphne', '--bind', host, '--port', port, 'archivebox.core.asgi:application']) - except (SystemExit, KeyboardInterrupt): + watch_worker(supervisor, "worker_daphne") + except KeyboardInterrupt: + print("\n[πŸ›‘] Got Ctrl+C, stopping gracefully...") + except SystemExit: pass - except Exception as e: - print(e) + except BaseException as e: + print(f"\n[πŸ›‘] Got {e.__class__.__name__} exception, stopping web server gracefully...") + raise + finally: + stop_worker(supervisor, "worker_daphne") + time.sleep(0.5) + + print("\n[🟩] ArchiveBox server shut down gracefully.") @enforce_types diff --git a/archivebox/plugantic/base_binary.py b/archivebox/plugantic/base_binary.py index 1bff0019..c38a2ad1 100644 --- a/archivebox/plugantic/base_binary.py +++ b/archivebox/plugantic/base_binary.py @@ -12,13 +12,13 @@ from ..config_stubs import AttrDict class BaseBinProvider(BaseHook, BinProvider): hook_type: HookType = 'BINPROVIDER' - + # def on_get_abspath(self, bin_name: BinName, **context) -> Optional[HostBinPath]: # Class = super() # get_abspath_func = lambda: Class.on_get_abspath(bin_name, **context) # # return cache.get_or_set(f'bin:abspath:{bin_name}', get_abspath_func) # return get_abspath_func() - + # def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **context) -> SemVer | None: # Class = super() # get_version_func = lambda: Class.on_get_version(bin_name, abspath, **context) diff --git a/archivebox/plugantic/base_queue.py b/archivebox/plugantic/base_queue.py new file mode 100644 index 00000000..39593297 --- /dev/null +++ b/archivebox/plugantic/base_queue.py @@ -0,0 +1,143 @@ +__package__ = 'archivebox.plugantic' + +import importlib + +from typing import Dict, List, TYPE_CHECKING +from pydantic import Field, InstanceOf + +if TYPE_CHECKING: + from huey.api import TaskWrapper + +from .base_hook import BaseHook, HookType +from .base_binary import BaseBinary +from ..config_stubs import AttrDict + + + +class BaseQueue(BaseHook): + hook_type: HookType = 'QUEUE' + + name: str = Field() # e.g. 'singlefile' + + binaries: List[InstanceOf[BaseBinary]] = Field() + + @property + def tasks(self) -> Dict[str, 'TaskWrapper']: + """Return an AttrDict of all the background worker tasks defined in the plugin's tasks.py file.""" + tasks = importlib.import_module(f"{self.plugin_module}.tasks") + + all_tasks = {} + + for task_name, task in tasks.__dict__.items(): + # if attr is a Huey task and its queue_name matches our hook's queue name + if hasattr(task, "task_class") and task.huey.name == self.name: + all_tasks[task_name] = task + + return AttrDict(all_tasks) + + def get_huey_config(self, settings) -> dict: + return { + "huey_class": "huey.SqliteHuey", + "filename": settings.QUEUE_DATABASE_NAME, + "name": self.name, + "results": True, + "store_none": True, + "immediate": False, + "utc": True, + "consumer": { + "workers": 1, + "worker_type": "thread", + "initial_delay": 0.1, # Smallest polling interval, same as -d. + "backoff": 1.15, # Exponential backoff using this rate, -b. + "max_delay": 10.0, # Max possible polling interval, -m. + "scheduler_interval": 1, # Check schedule every second, -s. + "periodic": True, # Enable crontab feature. + "check_worker_health": True, # Enable worker health checks. + "health_check_interval": 1, # Check worker health every second. + }, + } + + def get_supervisor_config(self, settings) -> dict: + return { + "name": f"worker_{self.name}", + "command": f"archivebox manage djangohuey --queue {self.name}", + "stdout_logfile": f"logs/worker_{self.name}.log", + "redirect_stderr": "true", + "autorestart": "true", + "autostart": "false", + } + + def start_supervisord_worker(self, settings, lazy=True): + from queues.supervisor_util import get_or_create_supervisord_process, start_worker + print() + try: + supervisor = get_or_create_supervisord_process(daemonize=False) + except Exception as e: + print(f"Error starting worker for queue {self.name}: {e}") + return None + print() + worker = start_worker(supervisor, self.get_supervisor_config(settings), lazy=lazy) + return worker + + def register(self, settings, parent_plugin=None): + # self._plugin = parent_plugin # for debugging only, never rely on this! + + # Side effect: register queue with django-huey multiqueue dict + settings.DJANGO_HUEY = getattr(settings, "DJANGO_HUEY", None) or AttrDict({"queues": {}}) + settings.DJANGO_HUEY["queues"][self.name] = self.get_huey_config(settings) + + # Side effect: register some extra tasks with huey + # on_startup(queue=self.name)(self.on_startup_task) + # db_periodic_task(crontab(minute='*/5'))(self.on_periodic_task) + + # Side effect: start consumer worker process under supervisord + settings.WORKERS = getattr(settings, "WORKERS", None) or AttrDict({}) + settings.WORKERS[self.id] = self.start_supervisord_worker(settings, lazy=True) + + # Install queue into settings.QUEUES + settings.QUEUES = getattr(settings, "QUEUES", None) or AttrDict({}) + settings.QUEUES[self.id] = self + + # Record installed hook into settings.HOOKS + super().register(settings, parent_plugin=parent_plugin) + + +# class WgetToggleConfig(ConfigSet): +# section: ConfigSectionName = 'ARCHIVE_METHOD_TOGGLES' + +# SAVE_WGET: bool = True +# SAVE_WARC: bool = True + +# class WgetDependencyConfig(ConfigSet): +# section: ConfigSectionName = 'DEPENDENCY_CONFIG' + +# WGET_BINARY: str = Field(default='wget') +# WGET_ARGS: Optional[List[str]] = Field(default=None) +# WGET_EXTRA_ARGS: List[str] = [] +# WGET_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}'] + +# class WgetOptionsConfig(ConfigSet): +# section: ConfigSectionName = 'ARCHIVE_METHOD_OPTIONS' + +# # loaded from shared config +# WGET_AUTO_COMPRESSION: bool = Field(default=True) +# SAVE_WGET_REQUISITES: bool = Field(default=True) +# WGET_USER_AGENT: str = Field(default='', alias='USER_AGENT') +# WGET_TIMEOUT: int = Field(default=60, alias='TIMEOUT') +# WGET_CHECK_SSL_VALIDITY: bool = Field(default=True, alias='CHECK_SSL_VALIDITY') +# WGET_RESTRICT_FILE_NAMES: str = Field(default='windows', alias='RESTRICT_FILE_NAMES') +# WGET_COOKIES_FILE: Optional[Path] = Field(default=None, alias='COOKIES_FILE') + + +# CONFIG = { +# 'CHECK_SSL_VALIDITY': False, +# 'SAVE_WARC': False, +# 'TIMEOUT': 999, +# } + + +# WGET_CONFIG = [ +# WgetToggleConfig(**CONFIG), +# WgetDependencyConfig(**CONFIG), +# WgetOptionsConfig(**CONFIG), +# ] diff --git a/archivebox/queues/__init__.py b/archivebox/queues/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archivebox/queues/apps.py b/archivebox/queues/apps.py new file mode 100644 index 00000000..1555e810 --- /dev/null +++ b/archivebox/queues/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class QueuesConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'queues' diff --git a/archivebox/queues/migrations/__init__.py b/archivebox/queues/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archivebox/queues/settings.py b/archivebox/queues/settings.py new file mode 100644 index 00000000..50df38e7 --- /dev/null +++ b/archivebox/queues/settings.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from django.conf import settings + + +OUTPUT_DIR = settings.CONFIG.OUTPUT_DIR +LOGS_DIR = settings.CONFIG.LOGS_DIR + +TMP_DIR = OUTPUT_DIR / "tmp" + +Path.mkdir(TMP_DIR, exist_ok=True) + + +CONFIG_FILE = TMP_DIR / "supervisord.conf" +PID_FILE = TMP_DIR / "supervisord.pid" +SOCK_FILE = TMP_DIR / "supervisord.sock" +LOG_FILE = TMP_DIR / "supervisord.log" +WORKER_DIR = TMP_DIR / "workers" diff --git a/archivebox/queues/supervisor_util.py b/archivebox/queues/supervisor_util.py new file mode 100644 index 00000000..56e74d2e --- /dev/null +++ b/archivebox/queues/supervisor_util.py @@ -0,0 +1,261 @@ +__package__ = 'archivebox.queues' + +import sys +import time +import signal +import psutil +import subprocess +from pathlib import Path +from rich.pretty import pprint + +from typing import Dict, cast + +from supervisor.xmlrpc import SupervisorTransport +from xmlrpc.client import ServerProxy + +from .settings import CONFIG_FILE, PID_FILE, SOCK_FILE, LOG_FILE, WORKER_DIR, TMP_DIR, LOGS_DIR + + +def create_supervisord_config(): + config_content = f""" +[supervisord] +nodaemon = true +environment = IS_SUPERVISORD_PARENT="true" +pidfile = %(here)s/{PID_FILE.name} +logfile = %(here)s/../{LOGS_DIR.name}/{LOG_FILE.name} +childlogdir = %(here)s/../{LOGS_DIR.name} +directory = %(here)s/.. +strip_ansi = true +nocleanup = true + +[unix_http_server] +file = %(here)s/{SOCK_FILE.name} +chmod = 0700 + +[supervisorctl] +serverurl = unix://%(here)s/{SOCK_FILE.name} + +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[include] +files = %(here)s/{WORKER_DIR.name}/*.conf + +""" + with open(CONFIG_FILE, "w") as f: + f.write(config_content) + +def create_worker_config(daemon): + Path.mkdir(WORKER_DIR, exist_ok=True) + + name = daemon['name'] + configfile = WORKER_DIR / f"{name}.conf" + + config_content = f"[program:{name}]\n" + for key, value in daemon.items(): + if key == 'name': continue + config_content += f"{key}={value}\n" + config_content += "\n" + + with open(configfile, "w") as f: + f.write(config_content) + + +def get_existing_supervisord_process(): + try: + transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}") + server = ServerProxy("http://localhost", transport=transport) + current_state = cast(Dict[str, int | str], server.supervisor.getState()) + if current_state["statename"] == "RUNNING": + pid = server.supervisor.getPID() + print(f"[πŸ¦Έβ€β™‚οΈ] Supervisord connected (pid={pid}) via unix://{str(SOCK_FILE).replace(str(TMP_DIR), 'tmp')}.") + return server.supervisor + except FileNotFoundError: + return None + except Exception as e: + print(f"Error connecting to existing supervisord: {str(e)}") + return None + +def stop_existing_supervisord_process(): + try: + pid = int(PID_FILE.read_text()) + except FileNotFoundError: + return + except ValueError: + PID_FILE.unlink() + return + + try: + print(f"[πŸ¦Έβ€β™‚οΈ] Stopping supervisord process (pid={pid})...") + proc = psutil.Process(pid) + proc.terminate() + proc.wait() + except Exception: + raise + try: + PID_FILE.unlink() + except FileNotFoundError: + pass + +def start_new_supervisord_process(daemonize=True): + print(f"[πŸ¦Έβ€β™‚οΈ] Supervisord starting{' in background' if daemonize else ''}...") + # Create a config file in the current working directory + create_supervisord_config() + + # Start supervisord + subprocess.Popen( + f"supervisord --configuration={CONFIG_FILE}", + stdin=None, + shell=True, + start_new_session=daemonize, + ) + + def exit_signal_handler(signum, frame): + if signum != 13: + print(f"\n[πŸ¦Έβ€β™‚οΈ] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...") + stop_existing_supervisord_process() + raise SystemExit(0) + + # Monitor for termination signals and cleanup child processes + if not daemonize: + signal.signal(signal.SIGINT, exit_signal_handler) + signal.signal(signal.SIGHUP, exit_signal_handler) + signal.signal(signal.SIGPIPE, exit_signal_handler) + signal.signal(signal.SIGTERM, exit_signal_handler) + # otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode) + + time.sleep(2) + + return get_existing_supervisord_process() + +def get_or_create_supervisord_process(daemonize=True): + supervisor = get_existing_supervisord_process() + if supervisor is None: + stop_existing_supervisord_process() + supervisor = start_new_supervisord_process(daemonize=daemonize) + + assert supervisor and supervisor.getPID(), "Failed to start supervisord or connect to it!" + return supervisor + +def start_worker(supervisor, daemon, lazy=False): + assert supervisor.getPID() + + print(f"[πŸ¦Έβ€β™‚οΈ] Supervisord starting new subprocess worker: {daemon['name']}...") + create_worker_config(daemon) + + result = supervisor.reloadConfig() + added, changed, removed = result[0] + # print(f"Added: {added}, Changed: {changed}, Removed: {removed}") + for removed in removed: + supervisor.stopProcessGroup(removed) + supervisor.removeProcessGroup(removed) + for changed in changed: + supervisor.stopProcessGroup(changed) + supervisor.removeProcessGroup(changed) + supervisor.addProcessGroup(changed) + for added in added: + supervisor.addProcessGroup(added) + + time.sleep(1) + + for _ in range(10): + procs = supervisor.getAllProcessInfo() + for proc in procs: + if proc['name'] == daemon["name"]: + # See process state diagram here: http://supervisord.org/subprocess.html + if proc['statename'] == 'RUNNING': + print(f" - Worker {daemon['name']}: already {proc['statename']} ({proc['description']})") + return proc + else: + if not lazy: + supervisor.startProcessGroup(daemon["name"], True) + proc = supervisor.getProcessInfo(daemon["name"]) + print(f" - Worker {daemon['name']}: started {proc['statename']} ({proc['description']})") + return proc + + # retry in a second in case it's slow to launch + time.sleep(0.5) + + raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}") + + +def watch_worker(supervisor, daemon_name, interval=5): + """loop continuously and monitor worker's health""" + while True: + proc = get_worker(supervisor, daemon_name) + if not proc: + raise Exception("Worker dissapeared while running! " + daemon_name) + + if proc['statename'] == 'STOPPED': + return proc + + if proc['statename'] == 'RUNNING': + time.sleep(1) + continue + + if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'): + print(f'[πŸ¦Έβ€β™‚οΈ] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}') + time.sleep(interval) + continue + + +def get_worker(supervisor, daemon_name): + try: + return supervisor.getProcessInfo(daemon_name) + except Exception: + pass + return None + +def stop_worker(supervisor, daemon_name): + proc = get_worker(supervisor, daemon_name) + + for _ in range(10): + if not proc: + # worker does not exist (was never running or configured in the first place) + return True + + # See process state diagram here: http://supervisord.org/subprocess.html + if proc['statename'] == 'STOPPED': + # worker was configured but has already stopped for some reason + supervisor.removeProcessGroup(daemon_name) + return True + else: + # worker was configured and is running, stop it now + supervisor.stopProcessGroup(daemon_name) + + # wait 500ms and then re-check to make sure it's really stopped + time.sleep(0.5) + proc = get_worker(supervisor, daemon_name) + + raise Exception(f"Failed to stop worker {daemon_name}!") + +def main(daemons): + supervisor = get_or_create_supervisord_process(daemonize=True) + + worker = start_worker(supervisor, daemons["webworker"]) + pprint(worker) + + print("All processes started in background.") + + # Optionally you can block the main thread until an exit signal is received: + # try: + # signal.pause() + # except KeyboardInterrupt: + # pass + # finally: + # stop_existing_supervisord_process() + +# if __name__ == "__main__": + +# DAEMONS = { +# "webworker": { +# "name": "webworker", +# "command": "python3 -m http.server 9000", +# "directory": str(cwd), +# "autostart": "true", +# "autorestart": "true", +# "stdout_logfile": cwd / "webworker.log", +# "stderr_logfile": cwd / "webworker_error.log", +# }, +# } +# main(DAEMONS, cwd) diff --git a/archivebox/queues/tasks.py b/archivebox/queues/tasks.py new file mode 100644 index 00000000..5bc09bc9 --- /dev/null +++ b/archivebox/queues/tasks.py @@ -0,0 +1,41 @@ +__package__ = 'archivebox.queues' + +from django_huey import db_task, task + +from huey_monitor.models import TaskModel +from huey_monitor.tqdm import ProcessInfo + +@db_task(queue="system_tasks", context=True) +def bg_add(add_kwargs, task=None, parent_task_id=None): + from ..main import add + + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + assert add_kwargs and add_kwargs.get("urls") + rough_url_count = add_kwargs["urls"].count("://") + + process_info = ProcessInfo(task, desc="add", parent_task_id=parent_task_id, total=rough_url_count) + + result = add(**add_kwargs) + process_info.update(n=rough_url_count) + return result + + +@task(queue="system_tasks", context=True) +def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None): + from ..extractors import archive_links + + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + assert args and args[0] + kwargs = kwargs or {} + + rough_count = len(args[0]) + + process_info = ProcessInfo(task, desc="archive_links", parent_task_id=parent_task_id, total=rough_count) + + result = archive_links(*args, **kwargs) + process_info.update(n=rough_count) + return result diff --git a/archivebox/templates/static/admin.css b/archivebox/templates/static/admin.css index 395e5d86..fe3c90d7 100644 --- a/archivebox/templates/static/admin.css +++ b/archivebox/templates/static/admin.css @@ -329,3 +329,15 @@ tbody .output-link { box-shadow: 4px 4px 4px rgba(0,0,0,0.1); } tbody .output-link:hover {opacity: 1;} + + + +@keyframes fadeIn { + 0% { opacity: 0; } + 20% { opacity: 0;} + 100% { opacity: 1; } +} + +.fade-in-progress-url { + animation: fadeIn 8s; +} diff --git a/pdm.lock b/pdm.lock index 6bcfc5e9..2ecc2d00 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "ldap", "sonic"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:c890335ff9967151514ff57e709d8b39c19f51edce5d15fb1b15c0a276a573f9" +content_hash = "sha256:ec23de8c5caf198c09f35e79411990eba9ed095da475f694d2a837c9a93d9bb1" [[metadata.targets]] requires_python = "==3.11.*" @@ -176,6 +176,34 @@ files = [ {file = "brotlicffi-1.1.0.0.tar.gz", hash = "sha256:b77827a689905143f87915310b93b273ab17888fd43ef350d4832c4a71083c13"}, ] +[[package]] +name = "bx-django-utils" +version = "79" +summary = "Various Django utility functions" +groups = ["default"] +marker = "python_version == \"3.11\"" +dependencies = [ + "bx-py-utils>=92", + "django>=4.2", + "python-stdnum", +] +files = [ + {file = "bx_django_utils-79-py3-none-any.whl", hash = "sha256:d50b10ace24b0b363574542faecf04a81029e2fec6d6e6525fe063ed06238e04"}, + {file = "bx_django_utils-79.tar.gz", hash = "sha256:cb66087d4e9396281acf5a4394b749cff3062b66082d5726f6a8a342fdd35d0e"}, +] + +[[package]] +name = "bx-py-utils" +version = "101" +requires_python = "<4,>=3.10" +summary = "Various Python utility functions" +groups = ["default"] +marker = "python_version == \"3.11\"" +files = [ + {file = "bx_py_utils-101-py3-none-any.whl", hash = "sha256:eece1f0b1e3c091d38f3013984056b05f43c6a0fd716489cf337d89df802ab59"}, + {file = "bx_py_utils-101.tar.gz", hash = "sha256:2aa295cde55da99b77f5f2f8b5bf8c0bec7e0046511832989ecbb1a43183cf75"}, +] + [[package]] name = "certifi" version = "2024.8.30" @@ -424,6 +452,40 @@ files = [ {file = "django_extensions-3.2.3-py3-none-any.whl", hash = "sha256:9600b7562f79a92cbf1fde6403c04fee314608fefbb595502e34383ae8203401"}, ] +[[package]] +name = "django-huey" +version = "1.2.1" +requires_python = ">=3.8" +summary = "An extension for django and huey that supports multi queue management" +groups = ["default"] +marker = "python_version == \"3.11\"" +dependencies = [ + "django>=3.2", + "huey>=2.0", +] +files = [ + {file = "django_huey-1.2.1-py3-none-any.whl", hash = "sha256:59c82b72fd4b6e60c219bd1fbab78acfe68a1c8d3efb1d3e42798a67d01a4aa2"}, + {file = "django_huey-1.2.1.tar.gz", hash = "sha256:634abf1e707acef90dd00df4267458486f89a3117419000ec5584b1c4129701a"}, +] + +[[package]] +name = "django-huey-monitor" +version = "0.9.0" +requires_python = ">=3.10" +summary = "Django based tool for monitoring huey task queue: https://github.com/coleifer/huey" +groups = ["default"] +marker = "python_version == \"3.11\"" +dependencies = [ + "bx-django-utils", + "bx-py-utils", + "django", + "huey", +] +files = [ + {file = "django-huey-monitor-0.9.0.tar.gz", hash = "sha256:03366d98579c07e132672aa760373949fecec108a0e91229e870bb21453c800b"}, + {file = "django_huey_monitor-0.9.0-py3-none-any.whl", hash = "sha256:1d5922d182e138e288f99d6cdb326cbed20c831d4c906c96cba148b0979e648a"}, +] + [[package]] name = "django-jsonform" version = "2.22.0" @@ -643,6 +705,16 @@ files = [ {file = "httpx-0.27.2.tar.gz", hash = "sha256:f7c2be1d2f3c3c3160d441802406b206c2b76f5947b11115e6df10c6c65e66c2"}, ] +[[package]] +name = "huey" +version = "2.5.1" +summary = "huey, a little task queue" +groups = ["default"] +marker = "python_version == \"3.11\"" +files = [ + {file = "huey-2.5.1.tar.gz", hash = "sha256:8a323783ab434a095a4e72b8c48c5b8f957f9031fa860474a390a0927e957112"}, +] + [[package]] name = "hyperlink" version = "21.0.0" @@ -832,19 +904,6 @@ dependencies = [ "requests", ] -[[package]] -name = "pocket" -version = "0.3.7" -git = "https://github.com/tapanpandita/pocket.git" -ref = "v0.3.7" -revision = "5a144438cc89bfc0ec94db960718ccf1f76468c1" -summary = "api wrapper for getpocket.com" -groups = ["default"] -marker = "python_version == \"3.11\"" -dependencies = [ - "requests", -] - [[package]] name = "prompt-toolkit" version = "3.0.47" @@ -860,6 +919,19 @@ files = [ {file = "prompt_toolkit-3.0.47.tar.gz", hash = "sha256:1e1b29cb58080b1e69f207c893a1a7bf16d127a5c30c9d17a25a5d77792e5360"}, ] +[[package]] +name = "psutil" +version = "6.0.0" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" +summary = "Cross-platform lib for process and system monitoring in Python." +groups = ["default"] +marker = "python_version == \"3.11\"" +files = [ + {file = "psutil-6.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fd9a97c8e94059b0ef54a7d4baf13b405011176c3b6ff257c247cae0d560ecd"}, + {file = "psutil-6.0.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:ffe7fc9b6b36beadc8c322f84e1caff51e8703b88eee1da46d1e3a6ae11b4fd0"}, + {file = "psutil-6.0.0.tar.gz", hash = "sha256:8faae4f310b6d969fa26ca0545338b21f73c6b15db7c4a8d934a5482faa818f2"}, +] + [[package]] name = "ptyprocess" version = "0.7.0" @@ -1057,6 +1129,17 @@ files = [ {file = "python-ldap-3.4.4.tar.gz", hash = "sha256:7edb0accec4e037797705f3a05cbf36a9fde50d08c8f67f2aef99a2628fab828"}, ] +[[package]] +name = "python-stdnum" +version = "1.20" +summary = "Python module to handle standardized numbers and codes" +groups = ["default"] +marker = "python_version == \"3.11\"" +files = [ + {file = "python-stdnum-1.20.tar.gz", hash = "sha256:ad2a2cf2eb025de408210235f36b4ae31252de3186240ccaa8126e117cb82690"}, + {file = "python_stdnum-1.20-py2.py3-none-any.whl", hash = "sha256:111008e10391d54fb2afad2a10df70d5cb0c6c0a7ec82fec6f022cb8712961d3"}, +] + [[package]] name = "pytz" version = "2024.1" @@ -1220,6 +1303,20 @@ files = [ {file = "stack_data-0.6.3.tar.gz", hash = "sha256:836a778de4fec4dcd1dcd89ed8abff8a221f58308462e1c4aa2a3cf30148f0b9"}, ] +[[package]] +name = "supervisor" +version = "4.2.5" +summary = "A system for controlling process state under UNIX" +groups = ["default"] +marker = "python_version == \"3.11\"" +dependencies = [ + "setuptools", +] +files = [ + {file = "supervisor-4.2.5-py2.py3-none-any.whl", hash = "sha256:2ecaede32fc25af814696374b79e42644ecaba5c09494c51016ffda9602d0f08"}, + {file = "supervisor-4.2.5.tar.gz", hash = "sha256:34761bae1a23c58192281a5115fb07fbf22c9b0133c08166beffc70fed3ebc12"}, +] + [[package]] name = "traitlets" version = "5.14.3" diff --git a/pyproject.toml b/pyproject.toml index 538d33d4..546f8453 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,27 +13,13 @@ readme = "README.md" # pdm update --unconstrained dependencies = [ # Last Bumped: 2024-08-20 - # Base Framework and Language Dependencies + ############# Django / Core Libraries ############# "setuptools>=69.5.1", "django>=5.0.4,<6.0", "django-ninja>=1.1.0", "django-extensions>=3.2.3", "mypy-extensions>=1.0.0", - # Python Helper Libraries - "requests>=2.31.0", - "dateparser>=1.0.0", - "feedparser>=6.0.11", - "w3lib>=2.1.2", - "rich>=13.8.0", - "ulid-py>=1.1.0", - "typeid-python>=0.3.0", - # Feature-Specific Dependencies - "python-crontab>=3.0.0", # for: archivebox schedule - "croniter>=2.0.5", # for: archivebox schedule - "ipython>=8.23.0", # for: archivebox shell - # Extractor Dependencies - "yt-dlp>=2024.8.6", # for: media - # "playwright>=1.43.0; platform_machine != 'armv7l'", # WARNING: playwright doesn't have any sdist, causes trouble on build systems that refuse to install wheel-only packages + "channels[daphne]>=4.1.0", "django-signal-webhooks>=0.3.0", "django-admin-data-views>=0.3.1", "django-object-actions>=4.2.0", @@ -41,6 +27,22 @@ dependencies = [ "django-pydantic-field>=0.3.9", "django-jsonform>=2.22.0", "django-stubs>=5.0.2", + "django-huey>=1.2.1", + "django-huey-monitor>=0.9.0", + ############# Python Helper Libraries ############ + "requests>=2.31.0", + "dateparser>=1.0.0", + "feedparser>=6.0.11", + "w3lib>=2.1.2", + "rich>=13.8.0", + "ulid-py>=1.1.0", + "typeid-python>=0.3.0", + "psutil>=6.0.0", + "supervisor>=4.2.5", + "python-crontab>=3.0.0", # for: archivebox schedule + "croniter>=2.0.5", # for: archivebox schedule + "ipython>=8.23.0", # for: archivebox shell + ############# VENDORED LIBS ###################### # these can be safely omitted when installation subsystem does not provide these as packages (e.g. apt/debian) # archivebox will automatically load fallback vendored copies bundled via archivebox/vendor/__init__.py "pydantic-pkgr>=0.1.4", @@ -48,7 +50,8 @@ dependencies = [ "pocket@git+https://github.com/tapanpandita/pocket.git@v0.3.7", "django-taggit==1.3.0", "base32-crockford==0.3.0", - "channels[daphne]>=4.1.0", + ############# Extractor Dependencies ############# + "yt-dlp>=2024.8.6", # for: media ] homepage = "https://github.com/ArchiveBox/ArchiveBox"