Update models.py

This commit is contained in:
Nick Sweeting 2025-01-02 23:58:45 -08:00 committed by GitHub
parent 96c5d2f7de
commit a851ad4c87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,110 +1,110 @@
import mimetypes # import mimetypes
import uuid # import uuid
from datetime import timedelta # from datetime import timedelta
from pathlib import Path # from pathlib import Path
from django.db import models # from django.db import models
from django.conf import settings # from django.conf import settings
from django.utils import timezone # from django.utils import timezone
from archivebox import DATA_DIR # from archivebox import DATA_DIR
from archivebox.misc.hashing import get_dir_info, hash_file # from archivebox.misc.hashing import get_dir_info, hash_file
from base_models.abid import DEFAULT_ABID_URI_SALT # from base_models.abid import DEFAULT_ABID_URI_SALT
from base_models.models import ABIDModel, ABIDField, get_or_create_system_user_pk # from base_models.models import ABIDModel, ABIDField, get_or_create_system_user_pk
class File(ABIDModel): # class File(ABIDModel):
abid_prefix = 'fil_' # abid_prefix = 'fil_'
abid_ts_src = 'self.created_at' # abid_ts_src = 'self.created_at'
abid_uri_src = 'self.path' # abid_uri_src = 'self.path'
abid_subtype_src = 'self.mime_type' # abid_subtype_src = 'self.mime_type'
abid_rand_src = 'self.id' # abid_rand_src = 'self.id'
abid_salt: str = DEFAULT_ABID_URI_SALT # combined with self.uri to anonymize hashes on a per-install basis (default is shared globally with all users, means everyone will hash ABC to -> 123 the same around the world, makes it easy to share ABIDs across installs and see if they are for the same URI. Change this if you dont want your hashes to be guessable / in the same hash space as all other users) # abid_salt: str = DEFAULT_ABID_URI_SALT # combined with self.uri to anonymize hashes on a per-install basis (default is shared globally with all users, means everyone will hash ABC to -> 123 the same around the world, makes it easy to share ABIDs across installs and see if they are for the same URI. Change this if you dont want your hashes to be guessable / in the same hash space as all other users)
abid_drift_allowed: bool = False # abid_drift_allowed: bool = False
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, null=False) # id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, null=False)
abid = ABIDField(prefix=abid_prefix) # abid = ABIDField(prefix=abid_prefix)
created_at = models.DateTimeField(default=timezone.now, null=False) # created_at = models.DateTimeField(default=timezone.now, null=False)
modified_at = models.DateTimeField(default=timezone.now, null=False) # modified_at = models.DateTimeField(default=timezone.now, null=False)
created_by = models.ForeignKey(settings.USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk) # created_by = models.ForeignKey(settings.USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk)
class StatusChoices(models.TextChoices): # class StatusChoices(models.TextChoices):
UNLOCKED = 'unlocked' # UNLOCKED = 'unlocked'
LOCKED = 'locked' # LOCKED = 'locked'
status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.UNLOCKED, null=False) # status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.UNLOCKED, null=False)
retry_at = models.DateTimeField(default=None, null=True) # retry_at = models.DateTimeField(default=None, null=True)
version = models.CharField(max_length=16, default='unknown', null=False) # version = models.CharField(max_length=16, default='unknown', null=False)
file = models.FileField(null=False) # file = models.FileField(null=False)
basename = models.CharField(max_length=255, default=None, null=False) # e.g. 'index' # basename = models.CharField(max_length=255, default=None, null=False) # e.g. 'index'
extension = models.CharField(max_length=63, default='', null=False) # e.g. 'html' # extension = models.CharField(max_length=63, default='', null=False) # e.g. 'html'
mime_type = models.CharField(max_length=63, default=None, null=False, db_index=True) # e.g. 'inode/directory' or 'text/html' # mime_type = models.CharField(max_length=63, default=None, null=False, db_index=True) # e.g. 'inode/directory' or 'text/html'
num_subpaths = models.IntegerField(default=None, null=False) # e.g. 3 # num_subpaths = models.IntegerField(default=None, null=False) # e.g. 3
num_bytes = models.IntegerField(default=None, null=False) # e.g. 123456 # num_bytes = models.IntegerField(default=None, null=False) # e.g. 123456
sha256 = models.CharField(max_length=64, default=None, null=False, db_index=True) # e.g. '5994471abb01112afcc1815994471abb01112afcc1815994471abb01112afcc181' # sha256 = models.CharField(max_length=64, default=None, null=False, db_index=True) # e.g. '5994471abb01112afcc1815994471abb01112afcc1815994471abb01112afcc181'
# blake3 = models.CharField(max_length=64, default=None, null=False, db_index=True) # e.g. '5994471abb01112afcc1815994471abb01112afcc1815994471abb01112afcc181' # # blake3 = models.CharField(max_length=64, default=None, null=False, db_index=True) # e.g. '5994471abb01112afcc1815994471abb01112afcc1815994471abb01112afcc181'
DIR = 'inode/directory' # DIR = 'inode/directory'
@classmethod # @classmethod
def release_expired_locks(cls): # def release_expired_locks(cls):
cls.objects.filter(status='locked', retry_at__lt=timezone.now()).update(status='unlocked', retry_at=None) # cls.objects.filter(status='locked', retry_at__lt=timezone.now()).update(status='unlocked', retry_at=None)
@property # @property
def parent(self) -> 'File': # def parent(self) -> 'File':
return File.objects.get(path=str(self.PATH.parent)) or File(path=str(self.PATH.parent)) # return File.objects.get(path=str(self.PATH.parent)) or File(path=str(self.PATH.parent))
@property # @property
def relpath(self) -> Path: # def relpath(self) -> Path:
return Path(self.file.name) # return Path(self.file.name)
@property # @property
def abspath(self) -> Path: # def abspath(self) -> Path:
return DATA_DIR / self.file.name # return DATA_DIR / self.file.name
def save(self, *args, **kwargs): # def save(self, *args, **kwargs):
assert self.abspath.exists() # assert self.abspath.exists()
if self.abspath.is_dir(): # if self.abspath.is_dir():
self.basename = self.relpath.name # self.basename = self.relpath.name
self.extension = '' # self.extension = ''
self.mime_type = self.DIR # self.mime_type = self.DIR
dir_info = get_dir_info(self.abspath) # dir_info = get_dir_info(self.abspath)
self.num_subpaths = dir_info['.']['num_subpaths'] # self.num_subpaths = dir_info['.']['num_subpaths']
self.num_bytes = dir_info['.']['num_bytes'] # self.num_bytes = dir_info['.']['num_bytes']
self.hash_sha256 = dir_info['.']['hash_sha256'] # self.hash_sha256 = dir_info['.']['hash_sha256']
# TODO: hash_blake3 = dir_info['.']['hash_blake3'] # # TODO: hash_blake3 = dir_info['.']['hash_blake3']
else: # else:
self.basename = self.relpath.name # self.basename = self.relpath.name
self.extension = self.relpath.suffix # self.extension = self.relpath.suffix
self.mime_type = mimetypes.guess_type(self.abspath)[0] # self.mime_type = mimetypes.guess_type(self.abspath)[0]
self.num_bytes = self.abspath.stat().st_size # self.num_bytes = self.abspath.stat().st_size
self.hash_sha256, self.hash_blake3 = hash_file(self.abspath) # self.hash_sha256, self.hash_blake3 = hash_file(self.abspath)
super().save(*args, **kwargs) # super().save(*args, **kwargs)
def acquire_lock(self, timeout_seconds: int = 60): # def acquire_lock(self, timeout_seconds: int = 60):
self.status = 'locked' # self.status = 'locked'
self.retry_at = timezone.now() + timedelta(seconds=timeout_seconds) # self.retry_at = timezone.now() + timedelta(seconds=timeout_seconds)
self.save() # self.save()
def release_lock(self): # def release_lock(self):
self.status = 'unlocked' # self.status = 'unlocked'
self.retry_at = None # self.retry_at = None
self.save() # self.save()
def move_to(self, new_path: Path): # def move_to(self, new_path: Path):
if str(new_path).startswith(str(DATA_DIR)): # if str(new_path).startswith(str(DATA_DIR)):
new_relpath = new_path.relative_to(DATA_DIR) # new_relpath = new_path.relative_to(DATA_DIR)
new_abspath = new_path # new_abspath = new_path
else: # else:
new_relpath = new_path # new_relpath = new_path
new_abspath = DATA_DIR / new_path # new_abspath = DATA_DIR / new_path
new_abspath.parent.mkdir(parents=True, exist_ok=True) # new_abspath.parent.mkdir(parents=True, exist_ok=True)
self.abspath.rename(new_abspath) # self.abspath.rename(new_abspath)
self.file.name = new_relpath # self.file.name = new_relpath
self.save() # self.save()