From e1a04729b35683dfe531495a42dfe0e61d38afac Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Sun, 12 May 2024 01:42:20 -0700 Subject: [PATCH] wait for bg threads to finish before exiting --- archivebox/cli/__init__.py | 42 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/archivebox/cli/__init__.py b/archivebox/cli/__init__.py index 9622c98f..169e8bdd 100644 --- a/archivebox/cli/__init__.py +++ b/archivebox/cli/__init__.py @@ -4,14 +4,18 @@ __command__ = 'archivebox' import os import sys import argparse +import threading +from time import sleep -from typing import Optional, Dict, List, IO, Union +from typing import Optional, Dict, List, IO, Union, Iterable from pathlib import Path -from ..config import OUTPUT_DIR, check_data_folder, check_migrations +from ..config import OUTPUT_DIR, check_data_folder, check_migrations, stderr from importlib import import_module +BUILTIN_LIST = list + CLI_DIR = Path(__file__).resolve().parent # these common commands will appear sorted before any others for ease-of-use @@ -33,6 +37,37 @@ is_valid_cli_module = lambda module, subcommand: ( ) +def wait_for_bg_threads_to_exit(thread_names: Iterable[str]=(), ignore_names: Iterable[str]=('MainThread', 'ThreadPoolExecutor'), timeout: int=60) -> int: + """ + Block until the specified threads exit. e.g. pass thread_names=('default_hook_handler',) to wait for webhooks. + Useful for waiting for signal handlers, webhooks, etc. to finish running after a mgmt command completes. + """ + + wait_for_all: bool = thread_names == () + + thread_matches = lambda thread, ptns: any(ptn in repr(thread) for ptn in ptns) + + should_wait = lambda thread: ( + not thread_matches(thread, ignore_names) + and (wait_for_all or thread_matches(thread, thread_names))) + + for tries in range(timeout): + all_threads = [*threading.enumerate()] + blocking_threads = [*filter(should_wait, all_threads)] + threads_summary = ', '.join(repr(t) for t in blocking_threads) + if blocking_threads: + sleep(1) + if tries == 5: # only show stderr message if we need to wait more than 5s + stderr( + f'[…] Waiting up to {timeout}s for background jobs (e.g. webhooks) to finish...', + threads_summary, + ) + else: + return tries + + raise Exception('Background threads failed to exit after {tries}s: {threads_summary}') + + def list_subcommands() -> Dict[str, str]: """find and import all valid archivebox_.py files in CLI_DIR""" @@ -79,6 +114,9 @@ def run_subcommand(subcommand: str, module = import_module('.archivebox_{}'.format(subcommand), __package__) module.main(args=subcommand_args, stdin=stdin, pwd=pwd) # type: ignore + # wait for webhooks, signals, and other background jobs to finish before exit + wait_for_bg_threads_to_exit(timeout=60) + SUBCOMMANDS = list_subcommands()