fix LIB_DIR and TMP_DIR loading when primary option isnt available

This commit is contained in:
Nick Sweeting 2024-10-21 00:35:25 -07:00
parent deb116eed4
commit a211461ffc
No known key found for this signature in database
21 changed files with 712 additions and 303 deletions

View file

@ -1,23 +1,39 @@
__package__ = 'archivebox.queues'
import sys
import time
import signal
import psutil
import shutil
import subprocess
from typing import Dict, cast, Iterator
from pathlib import Path
from functools import cache
from rich import print
from typing import Dict, cast
from supervisor.xmlrpc import SupervisorTransport
from xmlrpc.client import ServerProxy
from archivebox.config import CONSTANTS
from archivebox.config.paths import get_or_create_working_tmp_dir
from archivebox.config.permissions import ARCHIVEBOX_USER
from archivebox.misc.logging import STDERR
from archivebox.logging_util import pretty_path
from .settings import SUPERVISORD_CONFIG_FILE, DATA_DIR, PID_FILE, get_sock_file, LOG_FILE, WORKERS_DIR, TMP_DIR, LOGS_DIR
LOG_FILE_NAME = "supervisord.log"
CONFIG_FILE_NAME = "supervisord.conf"
PID_FILE_NAME = "supervisord.pid"
WORKERS_DIR_NAME = "workers"
from typing import Iterator
@cache
def get_sock_file():
"""Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits"""
TMP_DIR = get_or_create_working_tmp_dir(autofix=True, quiet=False)
assert TMP_DIR, "Failed to find or create a writable TMP_DIR!"
socket_file = TMP_DIR / "supervisord.sock"
return socket_file
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
@ -35,24 +51,30 @@ def follow(file, sleep_sec=0.1) -> Iterator[str]:
def create_supervisord_config():
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
config_content = f"""
[supervisord]
nodaemon = true
environment = IS_SUPERVISORD_PARENT="true"
pidfile = {TMP_DIR}/{PID_FILE.name}
logfile = {LOGS_DIR}/{LOG_FILE.name}
childlogdir = {LOGS_DIR}
directory = {DATA_DIR}
pidfile = {PID_FILE}
logfile = {LOG_FILE}
childlogdir = {CONSTANTS.LOGS_DIR}
directory = {CONSTANTS.DATA_DIR}
strip_ansi = true
nocleanup = true
user = {ARCHIVEBOX_USER}
[unix_http_server]
file = {get_sock_file()}
file = {SOCK_FILE}
chmod = 0700
[supervisorctl]
serverurl = unix://{get_sock_file()}
serverurl = unix://{SOCK_FILE}
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
@ -61,9 +83,14 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
files = {WORKERS_DIR}/*.conf
"""
SUPERVISORD_CONFIG_FILE.write_text(config_content)
CONFIG_FILE.write_text(config_content)
Path.mkdir(WORKERS_DIR, exist_ok=True)
(WORKERS_DIR / 'initial_startup.conf').write_text('') # hides error about "no files found to include" when supervisord starts
def create_worker_config(daemon):
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
Path.mkdir(WORKERS_DIR, exist_ok=True)
name = daemon['name']
@ -80,13 +107,14 @@ def create_worker_config(daemon):
def get_existing_supervisord_process():
SOCK_FILE = get_sock_file()
try:
transport = SupervisorTransport(None, None, f"unix://{get_sock_file()}")
transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
server = ServerProxy("http://localhost", transport=transport)
current_state = cast(Dict[str, int | str], server.supervisor.getState())
if current_state["statename"] == "RUNNING":
pid = server.supervisor.getPID()
print(f"[🦸‍♂️] Supervisord connected (pid={pid}) via unix://{str(get_sock_file()).replace(str(DATA_DIR), '.')}.")
print(f"[🦸‍♂️] Supervisord connected (pid={pid}) via unix://{pretty_path(SOCK_FILE)}.")
return server.supervisor
except FileNotFoundError:
return None
@ -95,58 +123,83 @@ def get_existing_supervisord_process():
return None
def stop_existing_supervisord_process():
SOCK_FILE = get_sock_file()
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
try:
pid = int(PID_FILE.read_text())
except FileNotFoundError:
return
except ValueError:
PID_FILE.unlink()
return
try:
pid = int(PID_FILE.read_text())
except (FileNotFoundError, ValueError):
return
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
proc = psutil.Process(pid)
proc.terminate()
proc.wait()
except Exception:
pass
try:
PID_FILE.unlink()
except FileNotFoundError:
pass
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
proc = psutil.Process(pid)
proc.terminate()
proc.wait()
except (Exception, BrokenPipeError, IOError):
pass
finally:
try:
# clear PID file and socket file
PID_FILE.unlink(missing_ok=True)
get_sock_file().unlink(missing_ok=True)
except Exception:
pass
def start_new_supervisord_process(daemonize=False):
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
print(f"[🦸‍♂️] Supervisord starting{' in background' if daemonize else ''}...")
# Create a config file in the current working directory
pretty_log_path = pretty_path(LOG_FILE)
print(f" > Writing supervisord logs to: {pretty_log_path}")
print(f" > Writing task worker logs to: {pretty_log_path.replace('supervisord.log', 'worker_*.log')}")
print(f' > Using supervisord config file: {pretty_path(CONFIG_FILE)}')
print(f" > Using supervisord UNIX socket: {pretty_path(SOCK_FILE)}")
print()
# clear out existing stale state files
shutil.rmtree(WORKERS_DIR, ignore_errors=True)
PID_FILE.unlink(missing_ok=True)
get_sock_file().unlink(missing_ok=True)
SUPERVISORD_CONFIG_FILE.unlink(missing_ok=True)
CONFIG_FILE.unlink(missing_ok=True)
# create the supervisord config file
create_supervisord_config()
# Start supervisord
# panel = Panel(f"Starting supervisord with config: {SUPERVISORD_CONFIG_FILE}")
# with Live(panel, refresh_per_second=1) as live:
subprocess.Popen(
f"supervisord --configuration={SUPERVISORD_CONFIG_FILE}",
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
shell=True,
start_new_session=daemonize,
)
def exit_signal_handler(signum, frame):
if signum != 13:
print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
if signum == 2:
STDERR.print("\n[🛑] Got Ctrl+C. Terminating child processes...")
elif signum != 13:
STDERR.print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
stop_existing_supervisord_process()
raise SystemExit(0)
# Monitor for termination signals and cleanup child processes
if not daemonize:
signal.signal(signal.SIGINT, exit_signal_handler)
signal.signal(signal.SIGHUP, exit_signal_handler)
signal.signal(signal.SIGPIPE, exit_signal_handler)
signal.signal(signal.SIGTERM, exit_signal_handler)
try:
signal.signal(signal.SIGINT, exit_signal_handler)
signal.signal(signal.SIGHUP, exit_signal_handler)
signal.signal(signal.SIGPIPE, exit_signal_handler)
signal.signal(signal.SIGTERM, exit_signal_handler)
except Exception:
# signal handlers only work in main thread
pass
# otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode)
time.sleep(2)
@ -154,14 +207,32 @@ def start_new_supervisord_process(daemonize=False):
return get_existing_supervisord_process()
def get_or_create_supervisord_process(daemonize=False):
SOCK_FILE = get_sock_file()
WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
supervisor = get_existing_supervisord_process()
if supervisor is None:
stop_existing_supervisord_process()
supervisor = start_new_supervisord_process(daemonize=daemonize)
time.sleep(0.5)
# wait up to 5s in case supervisord is slow to start
if not supervisor:
for _ in range(10):
if supervisor is not None:
print()
break
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(0.5)
supervisor = get_existing_supervisord_process()
else:
print()
assert supervisor, "Failed to start supervisord or connect to it!"
supervisor.getPID() # make sure it doesn't throw an exception
(WORKERS_DIR / 'initial_startup.conf').unlink(missing_ok=True)
return supervisor
@ -242,9 +313,9 @@ def tail_worker_logs(log_path: str):
for line in follow(f):
if '://' in line:
live.console.print(f"Working on: {line.strip()}")
table.add_row("123124234", line.strip())
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
# table.add_row("123124234", line.strip())
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
@ -321,12 +392,12 @@ def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
if not daemonize:
try:
watch_worker(supervisor, "worker_daphne")
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_daphne")
@ -350,12 +421,12 @@ def start_cli_workers(watch=False):
if watch:
try:
watch_worker(supervisor, "worker_system_tasks")
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_system_tasks")