diff --git a/archivebox/main.py b/archivebox/main.py index a7c52705..72130d67 100755 --- a/archivebox/main.py +++ b/archivebox/main.py @@ -598,6 +598,9 @@ def add(urls: Union[str, List[str]], """Add a new URL or list of URLs to your archive""" from core.models import Snapshot, Tag + from queues.supervisor_util import start_cli_workers, tail_worker_logs + from queues.tasks import bg_archive_link + assert depth in (0, 1), 'Depth must be 0 or 1 (depth >1 is not supported yet)' @@ -609,6 +612,8 @@ def add(urls: Union[str, List[str]], # Load list of links from the existing index check_data_folder(CONFIG) check_dependencies(CONFIG) + worker = start_cli_workers() + new_links: List[Link] = [] all_links = load_main_index(out_dir=out_dir) @@ -686,6 +691,8 @@ def add(urls: Union[str, List[str]], stderr(f'[*] [{ts}] Archiving {len(new_links)}/{len(all_links)} URLs from added set...', color='green') archive_links(new_links, overwrite=False, **archive_kwargs) + tail_worker_logs(worker['stdout_logfile']) + if CAN_UPGRADE: hint(f"There's a new version of ArchiveBox available! Your current version is {VERSION}. You can upgrade to {VERSIONS_AVAILABLE['recommended_version']['tag_name']} ({VERSIONS_AVAILABLE['recommended_version']['html_url']}). For more on how to upgrade: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives\n") @@ -789,9 +796,12 @@ def update(resume: Optional[float]=None, from core.models import ArchiveResult from .search import index_links + from .queues.supervisor_util import start_cli_workers + check_data_folder(CONFIG) check_dependencies(CONFIG) + start_cli_workers() new_links: List[Link] = [] # TODO: Remove input argument: only_new extractors = extractors.split(",") if extractors else [] diff --git a/archivebox/queues/supervisor_util.py b/archivebox/queues/supervisor_util.py index f34910c5..d35d90fb 100644 --- a/archivebox/queues/supervisor_util.py +++ b/archivebox/queues/supervisor_util.py @@ -14,6 +14,22 @@ from xmlrpc.client import ServerProxy from .settings import CONFIG_FILE, PID_FILE, SOCK_FILE, LOG_FILE, WORKER_DIR, TMP_DIR, LOGS_DIR +from typing import Iterator + +def follow(file, sleep_sec=0.1) -> Iterator[str]: + """ Yield each line from a file as they are written. + `sleep_sec` is the time to sleep after empty reads. """ + line = '' + while True: + tmp = file.readline() + if tmp is not None and tmp != "": + line += tmp + if line.endswith("\n"): + yield line + line = '' + elif sleep_sec: + time.sleep(sleep_sec) + def create_supervisord_config(): config_content = f""" @@ -56,8 +72,7 @@ def create_worker_config(daemon): config_content += f"{key}={value}\n" config_content += "\n" - with open(configfile, "w") as f: - f.write(config_content) + configfile.write_text(config_content) def get_existing_supervisord_process(): @@ -197,6 +212,27 @@ def watch_worker(supervisor, daemon_name, interval=5): time.sleep(interval) continue +def tail_worker_logs(log_path: str): + get_or_create_supervisord_process(daemonize=True) + + from rich.live import Live + from rich.table import Table + + table = Table() + table.add_column("TS") + table.add_column("URL") + + try: + with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid + with open(log_path, 'r') as f: + for line in follow(f): + if '://' in line: + live.console.print(f"Working on: {line.strip()}") + table.add_row("123124234", line.strip()) + except KeyboardInterrupt: + print("\n[🛑] Got Ctrl+C, stopping gracefully...") + except SystemExit: + pass def get_worker(supervisor, daemon_name): try: @@ -228,6 +264,83 @@ def stop_worker(supervisor, daemon_name): raise Exception(f"Failed to stop worker {daemon_name}!") + + + +def start_server_workers(host='0.0.0.0', port='8000'): + supervisor = get_or_create_supervisord_process(daemonize=False) + + bg_workers = [ + { + "name": "worker_system_tasks", + "command": "archivebox manage djangohuey --queue system_tasks", + "autostart": "true", + "autorestart": "true", + "stdout_logfile": "logs/worker_system_tasks.log", + "redirect_stderr": "true", + }, + ] + fg_worker = { + "name": "worker_daphne", + "command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application", + "autostart": "false", + "autorestart": "true", + "stdout_logfile": "logs/worker_daphne.log", + "redirect_stderr": "true", + } + + print() + for worker in bg_workers: + start_worker(supervisor, worker) + + print() + start_worker(supervisor, fg_worker) + print() + + try: + watch_worker(supervisor, "worker_daphne") + except KeyboardInterrupt: + print("\n[🛑] Got Ctrl+C, stopping gracefully...") + except SystemExit: + pass + except BaseException as e: + print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...") + raise + finally: + stop_worker(supervisor, "worker_daphne") + time.sleep(0.5) + + +def start_cli_workers(watch=False): + supervisor = get_or_create_supervisord_process(daemonize=False) + + fg_worker = { + "name": "worker_system_tasks", + "command": "archivebox manage djangohuey --queue system_tasks", + "autostart": "true", + "autorestart": "true", + "stdout_logfile": "logs/worker_system_tasks.log", + "redirect_stderr": "true", + } + + start_worker(supervisor, fg_worker) + + if watch: + try: + watch_worker(supervisor, "worker_system_tasks") + except KeyboardInterrupt: + print("\n[🛑] Got Ctrl+C, stopping gracefully...") + except SystemExit: + pass + except BaseException as e: + print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...") + raise + finally: + stop_worker(supervisor, "worker_system_tasks") + time.sleep(0.5) + return fg_worker + + def main(daemons): supervisor = get_or_create_supervisord_process(daemonize=True) diff --git a/archivebox/queues/tasks.py b/archivebox/queues/tasks.py index 5bc09bc9..8d776478 100644 --- a/archivebox/queues/tasks.py +++ b/archivebox/queues/tasks.py @@ -5,8 +5,13 @@ from django_huey import db_task, task from huey_monitor.models import TaskModel from huey_monitor.tqdm import ProcessInfo +from .supervisor_util import get_or_create_supervisord_process + + @db_task(queue="system_tasks", context=True) def bg_add(add_kwargs, task=None, parent_task_id=None): + get_or_create_supervisord_process(daemonize=True) + from ..main import add if task and parent_task_id: @@ -24,6 +29,8 @@ def bg_add(add_kwargs, task=None, parent_task_id=None): @task(queue="system_tasks", context=True) def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None): + get_or_create_supervisord_process(daemonize=True) + from ..extractors import archive_links if task and parent_task_id: @@ -39,3 +46,43 @@ def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None): result = archive_links(*args, **kwargs) process_info.update(n=rough_count) return result + + +@task(queue="system_tasks", context=True) +def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None): + get_or_create_supervisord_process(daemonize=True) + + from ..extractors import archive_link + + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + assert args and args[0] + kwargs = kwargs or {} + + rough_count = len(args[0]) + + process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=rough_count) + + result = archive_link(*args, **kwargs) + process_info.update(n=rough_count) + return result + + +@task(queue="system_tasks", context=True) +def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None): + # get_or_create_supervisord_process(daemonize=True) + + from ..extractors import archive_link + + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=1) + + link = snapshot.as_link_with_details() + + result = archive_link(link, overwrite=overwrite, methods=methods) + process_info.update(n=1) + return result +