add base extractor class

This commit is contained in:
Nick Sweeting 2024-12-03 02:14:42 -08:00
parent 1ceaa1ac7a
commit 337acdac9c
No known key found for this signature in database

View file

@ -0,0 +1,220 @@
import hashlib
import mimetypes
import os
from typing import ClassVar
from datetime import timedelta
from zipfile import Path
from django.utils import timezone
from core.models import ArchiveResult
import abx
import archivebox
class Extractor:
# static class variables
name: ClassVar[str] = 'ytdlp'
verbose_name: ClassVar[str] = 'YT-DLP'
binaries: ClassVar[tuple[str, ...]] = ()
daemons: ClassVar[tuple[str, ...]] = ()
timeout: ClassVar[int] = 60
# instance variables
ARCHIVERESULT: ArchiveResult
CONFIG: dict[str, object]
BINARIES: dict[str, object]
DAEMONS: dict[str, object]
def __init__(self, archiveresult: ArchiveResult, extra_config: dict | None=None):
assert archiveresult.pk, 'ArchiveResult must be saved to DB before it can be extracted'
self.archiveresult = self.ARCHIVERESULT = archiveresult
self.CONFIG = archivebox.pm.hook.get_SCOPE_CONFIG(archiveresult=self.archiveresult, extra=extra_config)
all_binaries = abx.as_dict(archivebox.pm.hook.get_BINARIES())
all_daemons = abx.as_dict(archivebox.pm.hook.get_DAEMONS())
self.BINARIES = {
binary_name: all_binaries[binary_name]
for binary_name in self.binaries
}
self.DAEMONS = {
daemon_name: all_daemons[daemon_name]
for daemon_name in self.daemons
}
def extract(self, config: dict | None=None) -> 'ArchiveResult':
"""
- making sure any binaries the extractor depends on are installed and loaded
- creating a new temporary working directory under the snapshot dir to hold extractor output
- setting up a timer signal to kill the extractor if it runs too long
- passing the extractor the URLs, temporary working directory, and config dict of options
- running the extractor in a shell subprocess and collecting stdout/stderr
- capturing the extractor's exit code
- if extractor exits with 29 (RetryError), it should set the status to 'BACKOFF' and set retry_at to a datetime in the future
- if extractor exits with 50 (NotApplicable), it should set the status to 'SKIPPED', and set retry_at to None
- setting the correct permissions and ownership on all the output files
- generating the merkle tree of all the output files and their hashes
- generating a thumbnail of the main output (or collecting one provided by the extractor)
- detecting any special outputs files that need to be parsed for other parts of the system (content-types? )
- metadata.json -> ArchiveResult.output_json
- outlinks.jsonl -> ArchiveResult.output_links
- search_texts.txt -> ArchiveResult.index_texts
- .merkle.json -> ArchiveResult.output_files
- videos.jsonl -> ArchiveResult.output_videos
- audios.jsonl -> ArchiveResult.output_audios
- images.jsonl -> ArchiveResult.output_images
- htmls.jsonl -> ArchiveResult.output_htmls
- saving all the result metadata to the ArchiveResult in the database
"""
archiveresult = self.ARCHIVERESULT
# config = get_scope_config(archiveresult=archiveresult.snapshot.url, env=...)
self.before_extract()
error = Exception('Failed to start extractor')
stdout = ''
stderr = ''
try:
proc = archiveresult.EXTRACTOR.spawn(url=archiveresult.snapshot.url, binaries=binaries, daemons=daemons, cwd=cwd, config=config)
stdout, stderr = proc.communicate()
error = None
except Exception as err:
error = err
finally:
self.after_extract(error=error)
return archiveresult
def should_extract(self):
if self.archiveresult.snapshot.url.startswith('https://youtube.com/'):
return True
return False
def load_binaries(self):
return {
bin_name: binary.load()
for bin_name, binary in self.BINARIES.items()
}
def load_daemons(self):
return {
daemon_name: daemon.load()
for daemon_name, daemon in self.DAEMONS.items()
}
def output_dir_name(self):
# e.g. 'ytdlp'
return f'{self.name}'
@property
def OUTPUT_DIR(self):
return self.archiveresult.snapshot_dir / self.output_dir_name()
def before_extract(self):
# create self.archiveresult.snapshot_dir / self.archiveresult.extractor / dir
# chown, chmod, etc.
binaries = self.load_binaries()
daemons = self.load_daemons()
cmd = self.archiveresult.EXTRACTOR.get_cmd(binaries=binaries, daemons=daemons)
cmd_version = self.archiveresult.EXTRACTOR.get_cmd_version(binaries=binaries, daemons=daemons)
self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
os.chmod(self.OUTPUT_DIR, 0o755)
self.archiveresult.status = self.archiveresult.StatusChoices.STARTED
self.archiveresult.retry_at = timezone.now() + timedelta(seconds=self.timeout)
self.archiveresult.start_ts = timezone.now()
self.archiveresult.end_ts = None
self.archiveresult.output = None
self.archiveresult.output_path = str(self.OUTPUT_DIR.relative_to(self.archiveresult.snapshot_dir))
self.archiveresult.cmd = cmd
self.archiveresult.cmd_version = cmd_version
self.archiveresult.machine = Machine.objects.get_current()
self.archiveresult.iface = NetworkInterface.objects.get_current()
self.archiveresult.save()
self.archiveresult.write_indexes()
def extract(self, url: str, binaries: dict, daemons: dict, cwd: Path, config: dict):
proc = subprocess.run(self.archiveresult.cmd, cwd=self.archiveresult.cwd, env=os.environ.update(binaries), timeout=self.timeout, shell=True, capture_output=True, text=True)
self.archiveresult.stdout = proc.stdout
self.archiveresult.stderr = proc.stderr
self.archiveresult.returncode = proc.returncode
self.archiveresult.save()
self.archiveresult.write_indexes()
def determine_status(self):
if self.archiveresult.returncode == 29:
return self.archiveresult.StatusChoices.BACKOFF, timezone.now() + timedelta(seconds=self.timeout)
elif self.archiveresult.returncode == 50:
return self.archiveresult.StatusChoices.SKIPPED, None
else:
return self.archiveresult.StatusChoices.FAILED, None
def collect_outputs(self, cwd: Path):
for file in cwd.rglob('*'):
path = file.relative_to(cwd)
os.chmod(file, 0o644)
#os.chown(file, ARCHIVEBOX_UID, ARCHIVEBOX_GID)
self.archiveresult.outputs.append({
'type': 'FILE',
'path': file.relative_to(cwd),
'size': file.stat().st_size,
'ext': file.suffix,
'mimetype': mimetypes.guess_type(file)[0],
'sha256': hashlib.sha256(file.read_bytes()).hexdigest(),
'blake3': hashlib.blake3(file.read_bytes()).hexdigest(),
'created_at': file.stat().st_ctime,
'modified_at': file.stat().st_mtime,
'symlinks': [
'screenshot.png',
'example.com',
]
})
outlinks = parse_outlinks(file)
if outlinks:
self.archiveresult.outputs.append({
'type': 'OUTLINK',
'url': outlink.target,
'selector': outlink.selector,
'text': outlink.text,
})
if path.endswith('favicon.ico'):
self.archiveresult.outputs.append({
'type': 'FAVICON',
'symlinks': {
'favicon': output_file['path'],
'favicon.ico': output_file['path'],
'favicon.png': output_file['path'].with_suffix('.png'),
},
'path': output_file['path'],
})
if path.endswith('.pdf'):
self.archiveresult.outputs.append({
'type': 'PDF',
'path': file.relative_to(cwd),
''
})
if 'text/plain' in mimetypes.guess_type(file):
self.archiveresult.outputs.append({
'type': 'SEARCHTEXT',
'path': file.relative_to(self.archiveresult.OUTPUT_DIR),
'archiveresult_id': self.archiveresult.id,
})
def after_extract(self, error: Exception | None=None):
status, retry_at = self.determine_status()
self.archiveresult.outputs = []
self.archiveresult.error = f'{type(error).__name__}: {error}' if error else None
self.archiveresult.status = self.archiveresult.StatusChoices.FAILED if error else self.archiveresult.StatusChoices.SUCCEEDED
self.archiveresult.retry_at = None
self.archiveresult.end_ts = timezone.now()
self.archiveresult.output = self.archiveresult.outputs[0].path
self.archiveresult.save()
self.archiveresult.write_indexes()