make chrome binary and configs directly runnable and make extractor use external bin

This commit is contained in:
Nick Sweeting 2024-12-06 02:04:05 -08:00
parent a572db307b
commit ac53fdf677
No known key found for this signature in database
7 changed files with 316 additions and 83 deletions

View file

@ -16,7 +16,7 @@ from django.utils.text import slugify
from django.utils import timezone
from django.core.cache import cache
from django.urls import reverse, reverse_lazy
from django.db.models import Case, When, Value, IntegerField
from django.db.models import Case, When, IntegerField
from django.contrib import admin
from django.conf import settings
@ -25,7 +25,8 @@ import abx
from archivebox.config import CONSTANTS
from archivebox.misc.system import get_dir_size
from archivebox.misc.util import parse_date, base_url
from archivebox.misc.util import parse_date, base_url, domain as url_domain
from archivebox.misc.hashing import get_dir_info
from archivebox.index.schema import Link
from archivebox.index.html import snapshot_icons
from archivebox.extractors import ARCHIVE_METHODS_INDEXING_PRECEDENCE
@ -142,8 +143,20 @@ def validate_timestamp(value):
assert value.replace('.', '').isdigit(), f'timestamp must be a float str, got: "{value}"'
class SnapshotManager(models.Manager):
def filter(self, *args, **kwargs):
"""add support for .filter(domain='example.com') to Snapshot queryset"""
domain = kwargs.pop('domain', None)
qs = super().filter(*args, **kwargs)
if domain:
qs = qs.filter(url__icontains=f'://{domain}')
return qs
def get_queryset(self):
return super().get_queryset().prefetch_related('tags', 'archiveresult_set') # .annotate(archiveresult_count=models.Count('archiveresult')).distinct()
return (
super().get_queryset()
.prefetch_related('tags', 'archiveresult_set')
# .annotate(archiveresult_count=models.Count('archiveresult')).distinct()
)
class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithStateMachine, ABIDModel):
abid_prefix = 'snp_'
@ -256,6 +269,13 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithStateMachine, ABIDM
if self.crawl and self.url not in self.crawl.urls:
self.crawl.urls += f'\n{self.url}'
self.crawl.save()
def output_dir_parent(self) -> str:
return 'archive'
def output_dir_name(self) -> str:
return str(self.timestamp)
def archive(self, overwrite=False, methods=None):
result = bg_archive_snapshot(self, overwrite=overwrite, methods=methods)
@ -338,6 +358,10 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithStateMachine, ABIDM
def bookmarked_date(self):
# TODO: remove this
return self.bookmarked
@cached_property
def domain(self) -> str:
return url_domain(self.url)
@cached_property
def is_archived(self):
@ -659,7 +683,8 @@ class ArchiveResult(ModelWithConfig, ModelWithOutputDir, ModelWithStateMachine,
notes = models.TextField(blank=True, null=False, default='', help_text='Any extra notes this ArchiveResult should have')
# the network interface that was used to download this result
# uplink = models.ForeignKey(NetworkInterface, on_delete=models.SET_NULL, null=True, blank=True, verbose_name='Network Interface Used')
# machine = models.ForeignKey(Machine, on_delete=models.SET_NULL, null=True, blank=True, verbose_name='Machine Used')
# network = models.ForeignKey(NetworkInterface, on_delete=models.SET_NULL, null=True, blank=True, verbose_name='Network Interface Used')
objects = ArchiveResultManager()
@ -742,8 +767,7 @@ class ArchiveResult(ModelWithConfig, ModelWithOutputDir, ModelWithStateMachine,
return None
def legacy_output_path(self):
link = self.snapshot.as_link()
return link.canonical_outputs().get(f'{self.extractor}_path')
return self.canonical_outputs().get(f'{self.extractor}_path')
def output_exists(self) -> bool:
output_path = Path(self.snapshot_dir) / self.extractor
@ -761,6 +785,89 @@ class ArchiveResult(ModelWithConfig, ModelWithOutputDir, ModelWithStateMachine,
for key in args
}
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""Predict the expected output paths that should be present after archiving"""
# You'll need to implement the actual logic based on your requirements
# TODO: banish this awful duplication from the codebase and import these
# from their respective extractor files
from abx_plugin_favicon.config import FAVICON_CONFIG
canonical = {
'index_path': 'index.html',
'favicon_path': 'favicon.ico',
'google_favicon_path': FAVICON_CONFIG.FAVICON_PROVIDER.format(self.domain),
'wget_path': f'warc/{self.timestamp}',
'warc_path': 'warc/',
'singlefile_path': 'singlefile.html',
'readability_path': 'readability/content.html',
'mercury_path': 'mercury/content.html',
'htmltotext_path': 'htmltotext.txt',
'pdf_path': 'output.pdf',
'screenshot_path': 'screenshot.png',
'dom_path': 'output.html',
'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
'git_path': 'git/',
'media_path': 'media/',
'headers_path': 'headers.json',
}
if self.is_static:
static_path = f'warc/{self.timestamp}'
canonical.update({
'title': self.basename,
'wget_path': static_path,
'pdf_path': static_path,
'screenshot_path': static_path,
'dom_path': static_path,
'singlefile_path': static_path,
'readability_path': static_path,
'mercury_path': static_path,
'htmltotext_path': static_path,
})
return canonical
@property
def output_dir_name(self) -> str:
return self.extractor
@property
def output_dir_parent(self) -> str:
return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR))
@cached_property
def output_files(self) -> dict[str, dict]:
dir_info = get_dir_info(self.OUTPUT_DIR, max_depth=6)
with open(self.OUTPUT_DIR / '.hashes.json', 'w') as f:
json.dump(dir_info, f)
return dir_info
def announce_event(self, output_type: str, event: dict):
event = {
**event,
'type': output_type,
}
# if event references a file, make sure it exists on disk
if 'path' in event:
file_path = Path(self.OUTPUT_DIR) / event['path']
assert file_path.exists(), f'ArchiveResult[{self.ABID}].announce_event(): File does not exist: {file_path} ({event})'
with open(self.OUTPUT_DIR / '.events.jsonl', 'a') as f:
f.write(json.dumps(event, sort_keys=True, default=str) + '\n')
def events(self, filter_type: str | None=None) -> list[dict]:
events = []
try:
with open(self.OUTPUT_DIR / '.events.jsonl', 'r') as f:
for line in f:
event = json.loads(line)
if filter_type is None or event['type'] == filter_type:
events.append(event)
except FileNotFoundError:
pass
return events
def write_indexes(self):
"""Write the ArchiveResult json, html, and merkle indexes to output dir, and pass searchable text to the search backend"""
super().write_indexes()