add new Process model to Machine models

This commit is contained in:
Nick Sweeting 2024-12-12 21:39:14 -08:00
parent 2a1afcf6c2
commit 651ba0b11c
No known key found for this signature in database

View file

@ -1,6 +1,12 @@
__package__ = 'archivebox.machine'
import sys
import os
import signal
import socket
import subprocess
import multiprocessing
from datetime import timedelta
from pathlib import Path
@ -29,43 +35,20 @@ INSTALLED_BINARY_RECHECK_INTERVAL = 1 * 30 * 60 # 30min (how often should w
class MachineManager(models.Manager):
def current(self) -> 'Machine':
"""Get the current machine that ArchiveBox is running on."""
global _CURRENT_MACHINE
if _CURRENT_MACHINE:
expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
# assume current machine cant change *while archivebox is actively running on it*
# it's not strictly impossible to swap hardware while code is running,
# but its rare and unusual so we check only once per week
# (e.g. VMWare can live-migrate a VM to a new host while it's running)
return _CURRENT_MACHINE
else:
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _created = self.update_or_create(
guid=get_host_guid(),
defaults={
'hostname': socket.gethostname(),
**get_os_info(),
**get_vm_info(),
'stats': get_host_stats(),
},
)
_CURRENT_MACHINE.save() # populate ABID
return _CURRENT_MACHINE
return Machine.current()
class Machine(ABIDModel, ModelWithHealthStats):
"""Audit log entry for a physical machine that was used to do archiving."""
abid_prefix = 'mxn_'
abid_prefix = 'mcn_'
abid_ts_src = 'self.created_at'
abid_uri_src = 'self.guid'
abid_subtype_src = '"01"'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'guid', 'hw_in_docker', 'hw_in_vm', 'hw_manufacturer', 'hw_product', 'hw_uuid', 'os_arch', 'os_family')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
@ -100,49 +83,54 @@ class Machine(ABIDModel, ModelWithHealthStats):
networkinterface_set: models.Manager['NetworkInterface']
@classmethod
def current(cls) -> 'Machine':
"""Get the current machine that ArchiveBox is running on."""
global _CURRENT_MACHINE
if _CURRENT_MACHINE:
expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
# assume current machine cant change *while archivebox is actively running on it*
# it's not strictly impossible to swap hardware while code is running,
# but its rare and unusual so we check only once per week
# (e.g. VMWare can live-migrate a VM to a new host while it's running)
return _CURRENT_MACHINE
else:
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _created = cls.objects.update_or_create(
guid=get_host_guid(),
defaults={
'hostname': socket.gethostname(),
**get_os_info(),
**get_vm_info(),
'stats': get_host_stats(),
},
)
_CURRENT_MACHINE.save() # populate ABID
return _CURRENT_MACHINE
class NetworkInterfaceManager(models.Manager):
def current(self) -> 'NetworkInterface':
"""Get the current network interface for the current machine."""
global _CURRENT_INTERFACE
if _CURRENT_INTERFACE:
# assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
return _CURRENT_INTERFACE
else:
_CURRENT_INTERFACE = None
machine = Machine.objects.current()
net_info = get_host_network()
_CURRENT_INTERFACE, _created = self.update_or_create(
machine=machine,
ip_public=net_info.pop('ip_public'),
ip_local=net_info.pop('ip_local'),
mac_address=net_info.pop('mac_address'),
dns_server=net_info.pop('dns_server'),
defaults=net_info,
)
_CURRENT_INTERFACE.save() # populate ABID
return _CURRENT_INTERFACE
return NetworkInterface.current()
class NetworkInterface(ABIDModel, ModelWithHealthStats):
"""Audit log entry for a physical network interface / internet connection that was used to do archiving."""
abid_prefix = 'ixf_'
abid_prefix = 'net_'
abid_ts_src = 'self.machine.created_at'
abid_uri_src = 'self.machine.guid'
abid_subtype_src = 'self.iface'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'machine', 'mac_address', 'ip_public', 'ip_local', 'dns_server')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
@ -178,6 +166,33 @@ class NetworkInterface(ABIDModel, ModelWithHealthStats):
# this forces us to store an audit trail whenever these things change
('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),
)
@classmethod
def current(cls) -> 'NetworkInterface':
"""Get the current network interface for the current machine."""
global _CURRENT_INTERFACE
if _CURRENT_INTERFACE:
# assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
return _CURRENT_INTERFACE
else:
_CURRENT_INTERFACE = None
machine = Machine.objects.current()
net_info = get_host_network()
_CURRENT_INTERFACE, _created = cls.objects.update_or_create(
machine=machine,
ip_public=net_info.pop('ip_public'),
ip_local=net_info.pop('ip_local'),
mac_address=net_info.pop('mac_address'),
dns_server=net_info.pop('dns_server'),
defaults=net_info,
)
_CURRENT_INTERFACE.save() # populate ABID
return _CURRENT_INTERFACE
class InstalledBinaryManager(models.Manager):
@ -250,6 +265,8 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'machine', 'name', 'binprovider', 'abspath', 'version', 'sha256')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
@ -278,7 +295,7 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
verbose_name = 'Installed Binary'
verbose_name_plural = 'Installed Binaries'
unique_together = (
('machine', 'name', 'binprovider', 'abspath', 'version', 'sha256'),
('machine', 'name', 'abspath', 'version', 'sha256'),
)
def __str__(self) -> str:
@ -343,3 +360,185 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
def load_fresh(self) -> Binary:
return archivebox.pm.hook.binary_load(binary=self.BINARY, fresh=True)
def spawn_process(proc_id: str):
proc = Process.objects.get(id=proc_id)
proc.spawn()
class ProcessManager(models.Manager):
pass
class ProcessQuerySet(models.QuerySet):
"""
Enhanced QuerySet for Process model, usage:
Process.objects.queued() -> QuerySet[Process] [Process(pid=None, returncode=None), Process(pid=None, returncode=None)]
Process.objects.running() -> QuerySet[Process] [Process(pid=123, returncode=None), Process(pid=456, returncode=None)]
Process.objects.exited() -> QuerySet[Process] [Process(pid=789, returncode=0), Process(pid=101, returncode=1)]
Process.objects.running().pids() -> [456]
Process.objects.kill() -> 1
"""
def queued(self):
return self.filter(pid__isnull=True, returncode__isnull=True)
def running(self):
return self.filter(pid__isnull=False, returncode__isnull=True)
def exited(self):
return self.filter(returncode__isnull=False)
def kill(self):
total_killed = 0
for proc in self.running():
proc.kill()
total_killed += 1
return total_killed
def pids(self):
return self.values_list('pid', flat=True)
class Process(ABIDModel):
abid_prefix = 'pid_'
abid_ts_src = 'self.created_at'
abid_uri_src = 'self.cmd'
abid_subtype_src = 'self.actor_type or "00"'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'cmd', 'cwd', 'actor_type', 'timeout')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
# immutable state
cmd = models.JSONField(default=list) # shell argv
cwd = models.CharField(max_length=255) # working directory
actor_type = models.CharField(max_length=255, null=True) # python ActorType that this process is running
timeout = models.PositiveIntegerField(null=True, default=None) # seconds to wait before killing the process if it's still running
created_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
modified_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
# mutable fields
machine = models.ForeignKey(Machine, on_delete=models.CASCADE)
pid = models.IntegerField(null=True)
launched_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)
returncode = models.IntegerField(null=True)
stdout = models.TextField(default='', null=False)
stderr = models.TextField(default='', null=False)
machine_id: str
# optional mutable state that can be used to trace what the process is doing
# active_event = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL)
emitted_events: models.RelatedManager['Event']
claimed_events: models.RelatedManager['Event']
objects: ProcessManager = ProcessManager.from_queryset(ProcessQuerySet)()
@classmethod
def current(cls) -> 'Process':
proc_id = os.environ.get('PROCESS_ID', '').strip()
if not proc_id:
proc = cls.objects.create(
cmd=sys.argv,
cwd=os.getcwd(),
actor_type=None,
timeout=None,
machine=Machine.objects.current(),
pid=os.getpid(),
launched_at=timezone.now(),
finished_at=None,
returncode=None,
stdout='',
stderr='',
)
os.environ['PROCESS_ID'] = str(proc.id)
return proc
proc = cls.objects.get(id=proc_id)
if proc.pid:
assert os.getpid() == proc.pid, f'Process ID mismatch: {proc.pid} != {os.getpid()}'
else:
proc.pid = os.getpid()
proc.machine = Machine.current()
proc.cwd = os.getcwd()
proc.cmd = sys.argv
proc.launched_at = proc.launched_at or timezone.now()
proc.save()
return proc
@classmethod
def create_and_fork(cls, **kwargs):
proc = cls.objects.create(**kwargs)
proc.fork()
return proc
def fork(self):
if self.pid:
raise Exception(f'Process is already running, cannot fork again: {self}')
# fork the process in the background
multiprocessing.Process(target=spawn_process, args=(self.id,)).start()
def spawn(self):
if self.pid:
raise Exception(f'Process already running, cannot spawn again: {self}')
# spawn the process in the foreground and block until it exits
proc = subprocess.Popen(self.cmd, cwd=self.cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
self.pid = proc.pid
self.launched_at = timezone.now()
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
# block until the process exits
proc.wait()
self.finished_at = timezone.now()
self.returncode = proc.returncode
self.stdout = proc.stdout.read()
self.stderr = proc.stderr.read()
self.pid = None
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
def kill(self):
if not self.is_running: return
assert self.machine == Machine.current(), f'Cannot kill actor on another machine: {self.machine_id} != {Machine.current().id}'
os.kill(self.pid, signal.SIGKILL)
self.pid = None
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
@property
def is_pending(self):
return (self.pid is None) and (self.returncode is None)
@property
def is_running(self):
return (self.pid is not None) and (self.returncode is None)
@property
def is_failed(self):
return self.returncode not in (None, 0)
@property
def is_succeeded(self):
return self.returncode == 0
# @property
# def is_idle(self):
# if not self.actor_type:
# raise Exception(f'Process {self.id} has no actor_type set, can only introspect active events if Process.actor_type is set to the Actor its running')
# return self.active_event is None