From 651ba0b11cf32154b505be69cc70d31a5043572d Mon Sep 17 00:00:00 2001
From: Nick Sweeting <git@sweeting.me>
Date: Thu, 12 Dec 2024 21:39:14 -0800
Subject: [PATCH] add new Process model to Machine models

---
 archivebox/machine/models.py | 311 ++++++++++++++++++++++++++++-------
 1 file changed, 255 insertions(+), 56 deletions(-)

diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py
index 09d6e840..78c96b64 100644
--- a/archivebox/machine/models.py
+++ b/archivebox/machine/models.py
@@ -1,6 +1,12 @@
 __package__ = 'archivebox.machine'
 
+import sys
+import os
+import signal
 import socket
+import subprocess
+import multiprocessing
+
 from datetime import timedelta
 from pathlib import Path
 
@@ -29,43 +35,20 @@ INSTALLED_BINARY_RECHECK_INTERVAL = 1 * 30 * 60     # 30min  (how often should w
 
 class MachineManager(models.Manager):
     def current(self) -> 'Machine':
-        """Get the current machine that ArchiveBox is running on."""
-        
-        global _CURRENT_MACHINE
-        if _CURRENT_MACHINE:
-            expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
-            if timezone.now() < expires_at:
-                # assume current machine cant change *while archivebox is actively running on it*
-                # it's not strictly impossible to swap hardware while code is running,
-                # but its rare and unusual so we check only once per week
-                # (e.g. VMWare can live-migrate a VM to a new host while it's running)
-                return _CURRENT_MACHINE
-            else:
-                _CURRENT_MACHINE = None
-        
-        _CURRENT_MACHINE, _created = self.update_or_create(
-            guid=get_host_guid(),
-            defaults={
-                'hostname': socket.gethostname(),
-                **get_os_info(),
-                **get_vm_info(),
-                'stats': get_host_stats(),
-            },
-        )        
-        _CURRENT_MACHINE.save()  # populate ABID
-        
-        return _CURRENT_MACHINE
+        return Machine.current()
 
 
 class Machine(ABIDModel, ModelWithHealthStats):
     """Audit log entry for a physical machine that was used to do archiving."""
     
-    abid_prefix = 'mxn_'
+    abid_prefix = 'mcn_'
     abid_ts_src = 'self.created_at'
     abid_uri_src = 'self.guid'
     abid_subtype_src = '"01"'
     abid_rand_src = 'self.id'
     abid_drift_allowed = False
+    
+    read_only_fields = ('id', 'abid', 'created_at', 'guid', 'hw_in_docker', 'hw_in_vm', 'hw_manufacturer', 'hw_product', 'hw_uuid', 'os_arch', 'os_family')
 
     id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
     abid = ABIDField(prefix=abid_prefix)
@@ -100,49 +83,54 @@ class Machine(ABIDModel, ModelWithHealthStats):
     
     networkinterface_set: models.Manager['NetworkInterface']
 
-    
+    @classmethod
+    def current(cls) -> 'Machine':
+        """Get the current machine that ArchiveBox is running on."""
+        
+        global _CURRENT_MACHINE
+        if _CURRENT_MACHINE:
+            expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
+            if timezone.now() < expires_at:
+                # assume current machine cant change *while archivebox is actively running on it*
+                # it's not strictly impossible to swap hardware while code is running,
+                # but its rare and unusual so we check only once per week
+                # (e.g. VMWare can live-migrate a VM to a new host while it's running)
+                return _CURRENT_MACHINE
+            else:
+                _CURRENT_MACHINE = None
+        
+        _CURRENT_MACHINE, _created = cls.objects.update_or_create(
+            guid=get_host_guid(),
+            defaults={
+                'hostname': socket.gethostname(),
+                **get_os_info(),
+                **get_vm_info(),
+                'stats': get_host_stats(),
+            },
+        )        
+        _CURRENT_MACHINE.save()  # populate ABID
+        
+        return _CURRENT_MACHINE
+
 
 
 class NetworkInterfaceManager(models.Manager):
     def current(self) -> 'NetworkInterface':
-        """Get the current network interface for the current machine."""
-        
-        global _CURRENT_INTERFACE
-        if _CURRENT_INTERFACE:
-            # assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
-            expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
-            if timezone.now() < expires_at:
-                return _CURRENT_INTERFACE
-            else:
-                _CURRENT_INTERFACE = None
-        
-        machine = Machine.objects.current()
-        net_info = get_host_network()
-        _CURRENT_INTERFACE, _created = self.update_or_create(
-            machine=machine,
-            ip_public=net_info.pop('ip_public'),
-            ip_local=net_info.pop('ip_local'),
-            mac_address=net_info.pop('mac_address'),
-            dns_server=net_info.pop('dns_server'),
-            defaults=net_info,
-        )
-        _CURRENT_INTERFACE.save()  # populate ABID
-
-        return _CURRENT_INTERFACE
-    
-
+        return NetworkInterface.current()
 
 
 class NetworkInterface(ABIDModel, ModelWithHealthStats):
     """Audit log entry for a physical network interface / internet connection that was used to do archiving."""
     
-    abid_prefix = 'ixf_'
+    abid_prefix = 'net_'
     abid_ts_src = 'self.machine.created_at'
     abid_uri_src = 'self.machine.guid'
     abid_subtype_src = 'self.iface'
     abid_rand_src = 'self.id'
     abid_drift_allowed = False
     
+    read_only_fields = ('id', 'abid', 'created_at', 'machine', 'mac_address', 'ip_public', 'ip_local', 'dns_server')
+    
     id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
     abid = ABIDField(prefix=abid_prefix)
 
@@ -178,6 +166,33 @@ class NetworkInterface(ABIDModel, ModelWithHealthStats):
             # this forces us to store an audit trail whenever these things change
             ('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),
         )
+        
+    @classmethod
+    def current(cls) -> 'NetworkInterface':
+        """Get the current network interface for the current machine."""
+        
+        global _CURRENT_INTERFACE
+        if _CURRENT_INTERFACE:
+            # assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
+            expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
+            if timezone.now() < expires_at:
+                return _CURRENT_INTERFACE
+            else:
+                _CURRENT_INTERFACE = None
+        
+        machine = Machine.objects.current()
+        net_info = get_host_network()
+        _CURRENT_INTERFACE, _created = cls.objects.update_or_create(
+            machine=machine,
+            ip_public=net_info.pop('ip_public'),
+            ip_local=net_info.pop('ip_local'),
+            mac_address=net_info.pop('mac_address'),
+            dns_server=net_info.pop('dns_server'),
+            defaults=net_info,
+        )
+        _CURRENT_INTERFACE.save()  # populate ABID
+
+        return _CURRENT_INTERFACE
 
 
 class InstalledBinaryManager(models.Manager):
@@ -250,6 +265,8 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
     abid_rand_src = 'self.id'
     abid_drift_allowed = False
     
+    read_only_fields = ('id', 'abid', 'created_at', 'machine', 'name', 'binprovider', 'abspath', 'version', 'sha256')
+    
     id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
     abid = ABIDField(prefix=abid_prefix)
 
@@ -278,7 +295,7 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
         verbose_name = 'Installed Binary'
         verbose_name_plural = 'Installed Binaries'
         unique_together = (
-            ('machine', 'name', 'binprovider', 'abspath', 'version', 'sha256'),
+            ('machine', 'name', 'abspath', 'version', 'sha256'),
         )
 
     def __str__(self) -> str:
@@ -343,3 +360,185 @@ class InstalledBinary(ABIDModel, ModelWithHealthStats):
 
     def load_fresh(self) -> Binary:
         return archivebox.pm.hook.binary_load(binary=self.BINARY, fresh=True)
+
+
+
+
+def spawn_process(proc_id: str):
+    proc = Process.objects.get(id=proc_id)
+    proc.spawn()
+    
+
+class ProcessManager(models.Manager):
+    pass
+
+class ProcessQuerySet(models.QuerySet):
+    """
+    Enhanced QuerySet for Process model, usage:
+        Process.objects.queued() -> QuerySet[Process] [Process(pid=None, returncode=None), Process(pid=None, returncode=None)]
+        Process.objects.running() -> QuerySet[Process] [Process(pid=123, returncode=None), Process(pid=456, returncode=None)]
+        Process.objects.exited() -> QuerySet[Process] [Process(pid=789, returncode=0), Process(pid=101, returncode=1)]
+        Process.objects.running().pids() -> [456]
+        Process.objects.kill() -> 1
+    """
+    
+    def queued(self):
+        return self.filter(pid__isnull=True, returncode__isnull=True)
+    
+    def running(self):
+        return self.filter(pid__isnull=False, returncode__isnull=True)
+            
+    def exited(self):
+        return self.filter(returncode__isnull=False)
+    
+    def kill(self):
+        total_killed = 0
+        for proc in self.running():
+            proc.kill()
+            total_killed += 1
+        return total_killed
+    
+    def pids(self):
+        return self.values_list('pid', flat=True)
+
+
+class Process(ABIDModel):
+    abid_prefix = 'pid_'
+    abid_ts_src = 'self.created_at'
+    abid_uri_src = 'self.cmd'
+    abid_subtype_src = 'self.actor_type or "00"'
+    abid_rand_src = 'self.id'
+    abid_drift_allowed = False
+    
+    read_only_fields = ('id', 'abid', 'created_at', 'cmd', 'cwd', 'actor_type', 'timeout')
+    
+    id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
+    abid = ABIDField(prefix=abid_prefix)
+    
+    # immutable state
+    cmd = models.JSONField(default=list)                             # shell argv
+    cwd = models.CharField(max_length=255)                           # working directory
+    actor_type = models.CharField(max_length=255, null=True)         # python ActorType that this process is running
+    timeout = models.PositiveIntegerField(null=True, default=None)   # seconds to wait before killing the process if it's still running
+    
+    created_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
+    modified_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
+
+    # mutable fields
+    machine = models.ForeignKey(Machine, on_delete=models.CASCADE)
+    pid = models.IntegerField(null=True)
+    launched_at = models.DateTimeField(null=True)
+    finished_at = models.DateTimeField(null=True)
+    returncode = models.IntegerField(null=True)
+    stdout = models.TextField(default='', null=False)
+    stderr = models.TextField(default='', null=False)
+
+    machine_id: str
+
+    # optional mutable state that can be used to trace what the process is doing
+    # active_event = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL)
+    
+    emitted_events: models.RelatedManager['Event']
+    claimed_events: models.RelatedManager['Event']
+    
+    objects: ProcessManager = ProcessManager.from_queryset(ProcessQuerySet)()
+
+    @classmethod
+    def current(cls) -> 'Process':
+        proc_id = os.environ.get('PROCESS_ID', '').strip()
+        if not proc_id:
+            proc = cls.objects.create(
+                cmd=sys.argv,
+                cwd=os.getcwd(),
+                actor_type=None,
+                timeout=None,
+                machine=Machine.objects.current(),
+                pid=os.getpid(),
+                launched_at=timezone.now(),
+                finished_at=None,
+                returncode=None,
+                stdout='',
+                stderr='',
+            )
+            os.environ['PROCESS_ID'] = str(proc.id)
+            return proc
+        
+        proc = cls.objects.get(id=proc_id)
+        if proc.pid:
+            assert os.getpid() == proc.pid, f'Process ID mismatch: {proc.pid} != {os.getpid()}'
+        else:
+            proc.pid = os.getpid()
+
+        proc.machine = Machine.current()
+        proc.cwd = os.getcwd()    
+        proc.cmd = sys.argv
+        proc.launched_at = proc.launched_at or timezone.now()
+        proc.save()
+        
+        return proc
+
+    @classmethod
+    def create_and_fork(cls, **kwargs):
+        proc = cls.objects.create(**kwargs)
+        proc.fork()
+        return proc
+
+    def fork(self):
+        if self.pid:
+            raise Exception(f'Process is already running, cannot fork again: {self}')
+        
+        # fork the process in the background
+        multiprocessing.Process(target=spawn_process, args=(self.id,)).start()
+
+    def spawn(self):
+        if self.pid:
+            raise Exception(f'Process already running, cannot spawn again: {self}')
+        
+        # spawn the process in the foreground and block until it exits
+        proc = subprocess.Popen(self.cmd, cwd=self.cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+        self.pid = proc.pid
+        self.launched_at = timezone.now()
+        self.save()
+        # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
+        
+        # block until the process exits
+        proc.wait()
+        self.finished_at = timezone.now()
+        self.returncode = proc.returncode
+        self.stdout = proc.stdout.read()
+        self.stderr = proc.stderr.read()
+        self.pid = None
+        self.save()
+        # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
+        
+    def kill(self):
+        if not self.is_running: return
+        assert self.machine == Machine.current(), f'Cannot kill actor on another machine: {self.machine_id} != {Machine.current().id}'
+        
+        os.kill(self.pid, signal.SIGKILL)
+        self.pid = None
+        self.save()
+        # Event.dispatch('PROC_UPDATED', {'process_id': self.id})
+
+    @property
+    def is_pending(self):
+        return (self.pid is None) and (self.returncode is None)
+
+    @property
+    def is_running(self):
+        return (self.pid is not None) and (self.returncode is None)
+    
+    @property
+    def is_failed(self):
+        return self.returncode not in (None, 0)
+    
+    @property
+    def is_succeeded(self):
+        return self.returncode == 0
+    
+    # @property
+    # def is_idle(self):
+    #     if not self.actor_type:
+    #         raise Exception(f'Process {self.id} has no actor_type set, can only introspect active events if Process.actor_type is set to the Actor its running')
+    #     return self.active_event is None
+