WIP add new effects, reads, writes, to abx interface

This commit is contained in:
Nick Sweeting 2024-10-15 22:33:22 -07:00
parent 0abbc11a6b
commit 35dd5911d8
No known key found for this signature in database
6 changed files with 249 additions and 177 deletions

View file

@ -0,0 +1,20 @@
"""
Hookspec for side-effects that ArchiveBox plugins can trigger.
(e.g. network requests, binary execution, remote API calls, external library calls, etc.)
"""
__package__ = 'abx.archivebox'
import abx
@abx.hookspec
def check_remote_seed_connection(urls, extractor, credentials, created_by):
pass
@abx.hookspec
def exec_extractor(url, extractor, credentials, config):
pass

View file

@ -0,0 +1,45 @@
"""
Hookspec for ArchiveBox system events that plugins can hook into.
Loosely modeled after Django's signals architecture.
https://docs.djangoproject.com/en/5.1/ref/signals/
"""
__package__ = 'abx.archivebox'
import abx
@abx.hookspec
def on_crawl_schedule_tick(crawl_schedule):
pass
@abx.hookspec
def on_seed_post_save(seed, created=False):
...
@abx.hookspec
def on_crawl_post_save(crawl, created=False):
...
@abx.hookspec
def on_snapshot_post_save(snapshot, created=False):
...
# @abx.hookspec
# def on_snapshot_post_delete(snapshot):
# ...
@abx.hookspec
def on_archiveresult_post_save(archiveresult, created=False):
...
# @abx.hookspec
# def on_archiveresult_post_delete(archiveresult):
# ...

View file

@ -5,6 +5,129 @@ from typing import Dict, Set, Any, TYPE_CHECKING
from benedict import benedict from benedict import benedict
from django.conf import settings
import abx import abx
from .. import pm from .. import pm
@abx.hookimpl
def get_or_create_snapshot(crawl, url, config):
pass
@abx.hookimpl
def update_crawl_schedule_next_run_at(crawl_schedule, next_run_at):
pass
@abx.hookimpl
def create_crawl_copy(crawl_to_copy, schedule):
pass
@abx.hookimpl
def create_crawl(seed, depth, tags_str, persona, created_by, config, schedule):
pass
def create_crawl_from_ui_action(urls, extractor, credentials, depth, tags_str, persona, created_by, crawl_config):
if seed_is_remote(urls, extractor, credentials):
# user's seed is a remote source that will provide the urls (e.g. RSS feed URL, Pocket API, etc.)
uri, extractor, credentials = abx.archivebox.effects.check_remote_seed_connection(urls, extractor, credentials, created_by)
else:
# user's seed is some raw text they provided to parse for urls, save it to a file then load the file as a Seed
uri = abx.archivebox.writes.write_raw_urls_to_local_file(urls, extractor, tags_str, created_by) # file:///data/sources/some_import.txt
seed = abx.archivebox.writes.get_or_create_seed(uri=remote_uri, extractor, credentials, created_by)
# abx.archivebox.events.on_seed_created(seed)
crawl = abx.archivebox.writes.create_crawl(seed=seed, depth=depth, tags_str=tags_str, persona=persona, created_by=created_by, config=crawl_config, schedule=None)
abx.archivebox.events.on_crawl_created(crawl)
@abx.hookimpl(specname='on_crawl_schedule_tick')
def create_crawl_from_crawlschedule_if_due(crawl_schedule):
# make sure it's not too early to run this scheduled import (makes this function indepmpotent / safe to call multiple times / every second)
if timezone.now() < crawl_schedule.next_run_at:
# it's not time to run it yet, wait for the next tick
return
else:
# we're going to run it now, bump the next run time so that no one else runs it at the same time as us
abx.archivebox.writes.update_crawl_schedule_next_run_at(crawl_schedule, next_run_at=crawl_schedule.next_run_at + crawl_schedule.interval)
crawl_to_copy = None
try:
crawl_to_copy = crawl_schedule.crawl_set.first() # alternatively use .last() to copy most recent crawl instead of very first crawl
except Crawl.DoesNotExist:
# there is no template crawl to base the next one off of
# user must add at least one crawl to a schedule that serves as the template for all future repeated crawls
return
new_crawl = abx.archivebox.writes.create_crawl_copy(crawl_to_copy=crawl_to_copy, schedule=crawl_schedule)
abx.archivebox.events.on_crawl_created(new_crawl)
@abx.hookimpl(specname='on_crawl_post_save')
def create_root_snapshot_from_seed(crawl):
# create a snapshot for the seed URI which kicks off the crawl
# only a single extractor will run on it, which will produce outlinks which get added back to the crawl
root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={
'extractors': (
abx.archivebox.reads.get_extractors_that_produce_outlinks()
if crawl.seed.extractor == 'auto' else
[crawl.seed.extractor]
),
**crawl.seed.config,
})
if created:
abx.archivebox.events.on_snapshot_created(root_snapshot)
abx.archivebox.writes.update_crawl_stats(started_at=timezone.now())
@abx.hookimpl(specname='on_snapshot_created')
def create_archiveresults_pending_from_snapshot(snapshot, config):
config = get_scope_config(
# defaults=settings.CONFIG_FROM_DEFAULTS,
# configfile=settings.CONFIG_FROM_FILE,
# environment=settings.CONFIG_FROM_ENVIRONMENT,
persona=archiveresult.snapshot.crawl.persona,
seed=archiveresult.snapshot.crawl.seed,
crawl=archiveresult.snapshot.crawl,
snapshot=archiveresult.snapshot,
archiveresult=archiveresult,
# extra_config=extra_config,
)
extractors = abx.archivebox.reads.get_extractors_for_snapshot(snapshot, config)
for extractor in extractors:
archiveresult, created = abx.archivebox.writes.get_or_create_archiveresult_pending(
snapshot=snapshot,
extractor=extractor,
status='pending'
)
if created:
abx.archivebox.events.on_archiveresult_created(archiveresult)
@abx.hookimpl(specname='on_archiveresult_updated')
def create_snapshots_pending_from_archiveresult_outlinks(archiveresult):
config = get_scope_config(...)
# check if extractor has finished succesfully, if not, dont bother checking for outlinks
if not archiveresult.status == 'succeeded':
return
# check if we have already reached the maximum recursion depth
hops_to_here = abx.archivebox.reads.get_outlink_parents(crawl_pk=archiveresult.snapshot.crawl_id, url=archiveresult.url, config=config)
if len(hops_to_here) >= archiveresult.crawl.max_depth +1:
return
# parse the output to get outlink url_entries
discovered_urls = abx.archivebox.reads.get_archiveresult_discovered_url_entries(archiveresult, config=config)
for url_entry in discovered_urls:
abx.archivebox.writes.create_outlink_record(src=archiveresult.snapshot.url, dst=url_entry.url, via=archiveresult)
abx.archivebox.writes.create_snapshot(crawl=archiveresult.snapshot.crawl, url_entry=url_entry)
# abx.archivebox.events.on_crawl_updated(archiveresult.snapshot.crawl)

View file

@ -2,12 +2,6 @@ __package__ = 'archivebox.crawls'
import time import time
import abx
import abx.archivebox.events
import abx.hookimpl
from datetime import datetime
from django_stubs_ext.db.models import TypedModelMeta from django_stubs_ext.db.models import TypedModelMeta
from django.db import models from django.db import models
@ -20,59 +14,10 @@ from django.urls import reverse_lazy
from pathlib import Path from pathlib import Path
from seeds.models import Seed
from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats
from ..extractors import EXTRACTOR_CHOICES
class Seed(ABIDModel, ModelWithHealthStats):
"""
A fountain that produces URLs (+metadata) each time it's queried e.g.
- file:///data/sources/2024-01-02_11-57-51__cli_add.txt
- file:///data/sources/2024-01-02_11-57-51__web_ui_add.txt
- file:///Users/squash/Library/Application Support/Google/Chrome/Default/Bookmarks
- https://getpocket.com/user/nikisweeting/feed
- https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml
- ...
Each query of a Seed can produce the same list of URLs, or a different list each time.
The list of URLs it returns is used to create a new Crawl and seed it with new pending Snapshots.
When a crawl is created, a root_snapshot is initially created with a URI set to the Seed URI.
The seed's preferred extractor is executed on that URI, which produces an ArchiveResult containing outlinks.
The outlinks then get turned into new pending Snapshots under the same crawl,
and the cycle repeats until Crawl.max_depth.
Each consumption of a Seed by an Extractor can produce new urls, as Seeds can point to
stateful remote services, files with contents that change, directories that have new files within, etc.
"""
abid_prefix = 'src_'
abid_ts_src = 'self.created_at'
abid_uri_src = 'self.uri'
abid_subtype_src = 'self.extractor'
abid_rand_src = 'self.id'
abid_drift_allowed = True
uri = models.URLField(max_length=255, blank=False, null=False, unique=True) # unique source location where URLs will be loaded from
extractor = models.CharField(choices=EXTRACTOR_CHOICES, default='auto', max_length=32) # suggested extractor to use to load this URL source
tags_str = models.CharField(max_length=255, null=False, blank=True, default='') # tags to attach to any URLs that come from this source
config = models.JSONField(default=dict) # extra config to put in scope when loading URLs from this source
created_at = AutoDateTimeField(default=None, null=False, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False)
@property
def source_type(self):
# e.g. http/https://
# file://
# pocketapi://
# s3://
# etc..
return self.uri.split('://')[0].lower()
class CrawlSchedule(ABIDModel, ModelWithHealthStats): class CrawlSchedule(ABIDModel, ModelWithHealthStats):
""" """
@ -107,8 +52,10 @@ class Crawl(ABIDModel, ModelWithHealthStats):
A single session of URLs to archive starting from a given Seed and expanding outwards. An "archiving session" so to speak. A single session of URLs to archive starting from a given Seed and expanding outwards. An "archiving session" so to speak.
A new Crawl should be created for each loading from a Seed (because it can produce a different set of URLs every time its loaded). A new Crawl should be created for each loading from a Seed (because it can produce a different set of URLs every time its loaded).
E.g. every scheduled import from an RSS feed should create a new Crawl. E.g. every scheduled import from an RSS feed should create a new Crawl, and more loadings from the same seed each create a new Crawl
Every "Add" task triggered from the Web UI or CLI should create a new Crawl.
Every "Add" task triggered from the Web UI, CLI, or Scheduled Crawl should create a new Crawl with the seed set to a
file URI e.g. file:///sources/<date>_{ui,cli}_add.txt containing the user's input.
""" """
abid_prefix = 'crl_' abid_prefix = 'crl_'
abid_ts_src = 'self.created_at' abid_ts_src = 'self.created_at'
@ -124,13 +71,13 @@ class Crawl(ABIDModel, ModelWithHealthStats):
created_at = AutoDateTimeField(default=None, null=False, db_index=True) created_at = AutoDateTimeField(default=None, null=False, db_index=True)
modified_at = models.DateTimeField(auto_now=True) modified_at = models.DateTimeField(auto_now=True)
seed = models.ForeignKey(Seed, on_delete=models.CASCADE, related_name='crawl_set', null=False, blank=False) seed = models.ForeignKey(Seed, on_delete=models.PROTECT, related_name='crawl_set', null=False, blank=False)
max_depth = models.PositiveSmallIntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(4)]) max_depth = models.PositiveSmallIntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(4)])
tags_str = models.CharField(max_length=1024, blank=True, null=False, default='') tags_str = models.CharField(max_length=1024, blank=True, null=False, default='')
persona = models.CharField(max_length=32, blank=True, null=False, default='auto') persona = models.CharField(max_length=32, blank=True, null=False, default='auto')
config = models.JSONField(default=dict) config = models.JSONField(default=dict)
schedule = models.ForeignKey(CrawlSchedule, null=True, blank=True, editable=True) schedule = models.ForeignKey(CrawlSchedule, on_delete=models.SET_NULL, null=True, blank=True, editable=True)
# crawler = models.CharField(choices=CRAWLER_CHOICES, default='breadth_first', max_length=32) # crawler = models.CharField(choices=CRAWLER_CHOICES, default='breadth_first', max_length=32)
# tags = models.ManyToManyField(Tag, blank=True, related_name='crawl_set', through='CrawlTag') # tags = models.ManyToManyField(Tag, blank=True, related_name='crawl_set', through='CrawlTag')
@ -175,102 +122,7 @@ class Outlink(models.Model):
unique_together = (('src', 'dst', 'via'),) unique_together = (('src', 'dst', 'via'),)
def scheduler_runloop():
# abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
while True:
# abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine())
scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True)
scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now())
for scheduled_crawl in scheduled_crawls_due:
try:
abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
except Exception as e:
abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
# abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
time.sleep(1)
def create_crawl_from_ui_action(urls, extractor, credentials, depth, tags_str, persona, created_by, crawl_config):
if seed_is_remote(urls, extractor, credentials):
# user's seed is a remote source that will provide the urls (e.g. RSS feed URL, Pocket API, etc.)
uri, extractor, credentials = abx.archivebox.effects.check_remote_seed_connection(urls, extractor, credentials, created_by)
else:
# user's seed is some raw text they provided to parse for urls, save it to a file then load the file as a Seed
uri = abx.archivebox.writes.write_raw_urls_to_local_file(urls, extractor, tags_str, created_by) # file:///data/sources/some_import.txt
seed = abx.archivebox.writes.get_or_create_seed(uri=remote_uri, extractor, credentials, created_by)
# abx.archivebox.events.on_seed_created(seed)
crawl = abx.archivebox.writes.create_crawl(seed=seed, depth=depth, tags_str=tags_str, persona=persona, created_by=created_by, config=crawl_config, schedule=None)
abx.archivebox.events.on_crawl_created(crawl)
@abx.hookimpl.on_crawl_schedule_tick
def create_crawl_from_crawlschedule_if_due(crawl_schedule):
# make sure it's not too early to run this scheduled import (makes this function indepmpotent / safe to call multiple times / every second)
if timezone.now() < crawl_schedule.next_run_at:
# it's not time to run it yet, wait for the next tick
return
else:
# we're going to run it now, bump the next run time so that no one else runs it at the same time as us
abx.archivebox.writes.update_crawl_schedule_next_run_at(crawl_schedule, next_run_at=crawl_schedule.next_run_at + crawl_schedule.interval)
crawl_to_copy = None
try:
crawl_to_copy = crawl_schedule.crawl_set.first() # alternatively use .last() to copy most recent crawl instead of very first crawl
except Crawl.DoesNotExist:
# there is no template crawl to base the next one off of
# user must add at least one crawl to a schedule that serves as the template for all future repeated crawls
return
new_crawl = abx.archivebox.writes.create_crawl_copy(crawl_to_copy=crawl_to_copy, schedule=crawl_schedule)
abx.archivebox.events.on_crawl_created(new_crawl)
@abx.hookimpl.on_crawl_created
def create_root_snapshot(crawl):
# create a snapshot for the seed URI which kicks off the crawl
# only a single extractor will run on it, which will produce outlinks which get added back to the crawl
root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={
'extractors': (
abx.archivebox.reads.get_extractors_that_produce_outlinks()
if crawl.seed.extractor == 'auto' else
[crawl.seed.extractor]
),
**crawl.seed.config,
})
if created:
abx.archivebox.events.on_snapshot_created(root_snapshot)
abx.archivebox.writes.update_crawl_stats(started_at=timezone.now())
@abx.hookimpl.on_snapshot_created
def create_archiveresults_pending_from_snapshot(snapshot, config):
config = get_scope_config(
# defaults=settings.CONFIG_FROM_DEFAULTS,
# configfile=settings.CONFIG_FROM_FILE,
# environment=settings.CONFIG_FROM_ENVIRONMENT,
persona=archiveresult.snapshot.crawl.persona,
seed=archiveresult.snapshot.crawl.seed,
crawl=archiveresult.snapshot.crawl,
snapshot=archiveresult.snapshot,
archiveresult=archiveresult,
# extra_config=extra_config,
)
extractors = abx.archivebox.reads.get_extractors_for_snapshot(snapshot, config)
for extractor in extractors:
archiveresult, created = abx.archivebox.writes.get_or_create_archiveresult_pending(
snapshot=snapshot,
extractor=extractor,
status='pending'
)
if created:
abx.archivebox.events.on_archiveresult_created(archiveresult)
@abx.hookimpl.on_archiveresult_created @abx.hookimpl.on_archiveresult_created
@ -298,28 +150,6 @@ def exec_archiveresult_extractor_effects(archiveresult):
abx.archivebox.events.on_snapshot_updated(archiveresult.snapshot) abx.archivebox.events.on_snapshot_updated(archiveresult.snapshot)
@abx.hookimpl.on_archiveresult_updated
def create_snapshots_pending_from_archiveresult_outlinks(archiveresult):
config = get_scope_config(...)
# check if extractor has finished succesfully, if not, dont bother checking for outlinks
if not archiveresult.status == 'succeeded':
return
# check if we have already reached the maximum recursion depth
hops_to_here = abx.archivebox.reads.get_outlink_parents(crawl_pk=archiveresult.snapshot.crawl_id, url=archiveresult.url, config=config)
if len(hops_to_here) >= archiveresult.crawl.max_depth +1:
return
# parse the output to get outlink url_entries
discovered_urls = abx.archivebox.reads.get_archiveresult_discovered_url_entries(archiveresult, config=config)
for url_entry in discovered_urls:
abx.archivebox.writes.create_outlink_record(src=archiveresult.snapshot.url, dst=url_entry.url, via=archiveresult)
abx.archivebox.writes.create_snapshot(crawl=archiveresult.snapshot.crawl, url_entry=url_entry)
# abx.archivebox.events.on_crawl_updated(archiveresult.snapshot.crawl)
@abx.hookimpl.reads.get_outlink_parents @abx.hookimpl.reads.get_outlink_parents
def get_outlink_parents(url, crawl_pk=None, config=None): def get_outlink_parents(url, crawl_pk=None, config=None):
scope = Q(dst=url) scope = Q(dst=url)

View file

@ -44,3 +44,22 @@ def get_BINARIES():
# 'screenshot': SCREENSHOT_EXTRACTOR, # 'screenshot': SCREENSHOT_EXTRACTOR,
# 'dom': DOM_EXTRACTOR, # 'dom': DOM_EXTRACTOR,
# } # }
# Hooks Available:
# Events:
# on_crawl_schedule_tick
# on_seed_post_save
# on_crawl_post_save
# on_snapshot_post_save
# on_archiveresult_post_save
# create_root_snapshot_from_seed
# create_archiveresults_pending_from_snapshot
# create_crawl_from_crawlschedule_if_due
# create_crawl_copy_from_template
#
# create_crawl_from_crawlschedule_if_due

View file

@ -1,5 +1,8 @@
__package__ = 'archivebox.queues' __package__ = 'archivebox.queues'
from functools import wraps
from django.utils import timezone
from django_huey import db_task, task from django_huey import db_task, task
from huey_monitor.models import TaskModel from huey_monitor.models import TaskModel
@ -7,6 +10,38 @@ from huey_monitor.tqdm import ProcessInfo
from .supervisor_util import get_or_create_supervisord_process from .supervisor_util import get_or_create_supervisord_process
# @db_task(queue="system_tasks", context=True, schedule=1)
# def scheduler_tick():
# print('SCHEDULER TICK', timezone.now().isoformat())
# # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
# # abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine())
# scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True)
# scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now())
# for scheduled_crawl in scheduled_crawls_due:
# try:
# abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
# except Exception as e:
# abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
# # abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
def db_task_with_parent(func):
"""Decorator for db_task that sets the parent task for the db_task"""
@wraps(func)
def wrapper(*args, **kwargs):
task = kwargs.get('task')
parent_task_id = kwargs.get('parent_task_id')
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
return func(*args, **kwargs)
return wrapper
@db_task(queue="system_tasks", context=True) @db_task(queue="system_tasks", context=True)
def bg_add(add_kwargs, task=None, parent_task_id=None): def bg_add(add_kwargs, task=None, parent_task_id=None):