diff --git a/archivebox/abx/archivebox/effects.py b/archivebox/abx/archivebox/effects.py new file mode 100644 index 00000000..8f0e54f3 --- /dev/null +++ b/archivebox/abx/archivebox/effects.py @@ -0,0 +1,20 @@ +""" +Hookspec for side-effects that ArchiveBox plugins can trigger. + +(e.g. network requests, binary execution, remote API calls, external library calls, etc.) +""" + +__package__ = 'abx.archivebox' + +import abx + + +@abx.hookspec +def check_remote_seed_connection(urls, extractor, credentials, created_by): + pass + + +@abx.hookspec +def exec_extractor(url, extractor, credentials, config): + pass + diff --git a/archivebox/abx/archivebox/events.py b/archivebox/abx/archivebox/events.py new file mode 100644 index 00000000..d3384318 --- /dev/null +++ b/archivebox/abx/archivebox/events.py @@ -0,0 +1,45 @@ +""" +Hookspec for ArchiveBox system events that plugins can hook into. + +Loosely modeled after Django's signals architecture. +https://docs.djangoproject.com/en/5.1/ref/signals/ +""" + +__package__ = 'abx.archivebox' + +import abx + + + +@abx.hookspec +def on_crawl_schedule_tick(crawl_schedule): + pass + + + + +@abx.hookspec +def on_seed_post_save(seed, created=False): + ... + +@abx.hookspec +def on_crawl_post_save(crawl, created=False): + ... + + +@abx.hookspec +def on_snapshot_post_save(snapshot, created=False): + ... + +# @abx.hookspec +# def on_snapshot_post_delete(snapshot): +# ... + + +@abx.hookspec +def on_archiveresult_post_save(archiveresult, created=False): + ... + +# @abx.hookspec +# def on_archiveresult_post_delete(archiveresult): +# ... diff --git a/archivebox/abx/archivebox/writes.py b/archivebox/abx/archivebox/writes.py index 78c1e098..0c4566b4 100644 --- a/archivebox/abx/archivebox/writes.py +++ b/archivebox/abx/archivebox/writes.py @@ -5,6 +5,129 @@ from typing import Dict, Set, Any, TYPE_CHECKING from benedict import benedict +from django.conf import settings + import abx from .. import pm + +@abx.hookimpl +def get_or_create_snapshot(crawl, url, config): + pass + +@abx.hookimpl +def update_crawl_schedule_next_run_at(crawl_schedule, next_run_at): + pass + +@abx.hookimpl +def create_crawl_copy(crawl_to_copy, schedule): + pass + +@abx.hookimpl +def create_crawl(seed, depth, tags_str, persona, created_by, config, schedule): + pass + + + + +def create_crawl_from_ui_action(urls, extractor, credentials, depth, tags_str, persona, created_by, crawl_config): + if seed_is_remote(urls, extractor, credentials): + # user's seed is a remote source that will provide the urls (e.g. RSS feed URL, Pocket API, etc.) + uri, extractor, credentials = abx.archivebox.effects.check_remote_seed_connection(urls, extractor, credentials, created_by) + else: + # user's seed is some raw text they provided to parse for urls, save it to a file then load the file as a Seed + uri = abx.archivebox.writes.write_raw_urls_to_local_file(urls, extractor, tags_str, created_by) # file:///data/sources/some_import.txt + + seed = abx.archivebox.writes.get_or_create_seed(uri=remote_uri, extractor, credentials, created_by) + # abx.archivebox.events.on_seed_created(seed) + + crawl = abx.archivebox.writes.create_crawl(seed=seed, depth=depth, tags_str=tags_str, persona=persona, created_by=created_by, config=crawl_config, schedule=None) + abx.archivebox.events.on_crawl_created(crawl) + + +@abx.hookimpl(specname='on_crawl_schedule_tick') +def create_crawl_from_crawlschedule_if_due(crawl_schedule): + # make sure it's not too early to run this scheduled import (makes this function indepmpotent / safe to call multiple times / every second) + if timezone.now() < crawl_schedule.next_run_at: + # it's not time to run it yet, wait for the next tick + return + else: + # we're going to run it now, bump the next run time so that no one else runs it at the same time as us + abx.archivebox.writes.update_crawl_schedule_next_run_at(crawl_schedule, next_run_at=crawl_schedule.next_run_at + crawl_schedule.interval) + + crawl_to_copy = None + try: + crawl_to_copy = crawl_schedule.crawl_set.first() # alternatively use .last() to copy most recent crawl instead of very first crawl + except Crawl.DoesNotExist: + # there is no template crawl to base the next one off of + # user must add at least one crawl to a schedule that serves as the template for all future repeated crawls + return + + new_crawl = abx.archivebox.writes.create_crawl_copy(crawl_to_copy=crawl_to_copy, schedule=crawl_schedule) + abx.archivebox.events.on_crawl_created(new_crawl) + + +@abx.hookimpl(specname='on_crawl_post_save') +def create_root_snapshot_from_seed(crawl): + # create a snapshot for the seed URI which kicks off the crawl + # only a single extractor will run on it, which will produce outlinks which get added back to the crawl + root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={ + 'extractors': ( + abx.archivebox.reads.get_extractors_that_produce_outlinks() + if crawl.seed.extractor == 'auto' else + [crawl.seed.extractor] + ), + **crawl.seed.config, + }) + if created: + abx.archivebox.events.on_snapshot_created(root_snapshot) + abx.archivebox.writes.update_crawl_stats(started_at=timezone.now()) + + +@abx.hookimpl(specname='on_snapshot_created') +def create_archiveresults_pending_from_snapshot(snapshot, config): + config = get_scope_config( + # defaults=settings.CONFIG_FROM_DEFAULTS, + # configfile=settings.CONFIG_FROM_FILE, + # environment=settings.CONFIG_FROM_ENVIRONMENT, + persona=archiveresult.snapshot.crawl.persona, + seed=archiveresult.snapshot.crawl.seed, + crawl=archiveresult.snapshot.crawl, + snapshot=archiveresult.snapshot, + archiveresult=archiveresult, + # extra_config=extra_config, + ) + + extractors = abx.archivebox.reads.get_extractors_for_snapshot(snapshot, config) + for extractor in extractors: + archiveresult, created = abx.archivebox.writes.get_or_create_archiveresult_pending( + snapshot=snapshot, + extractor=extractor, + status='pending' + ) + if created: + abx.archivebox.events.on_archiveresult_created(archiveresult) + + + +@abx.hookimpl(specname='on_archiveresult_updated') +def create_snapshots_pending_from_archiveresult_outlinks(archiveresult): + config = get_scope_config(...) + + # check if extractor has finished succesfully, if not, dont bother checking for outlinks + if not archiveresult.status == 'succeeded': + return + + # check if we have already reached the maximum recursion depth + hops_to_here = abx.archivebox.reads.get_outlink_parents(crawl_pk=archiveresult.snapshot.crawl_id, url=archiveresult.url, config=config) + if len(hops_to_here) >= archiveresult.crawl.max_depth +1: + return + + # parse the output to get outlink url_entries + discovered_urls = abx.archivebox.reads.get_archiveresult_discovered_url_entries(archiveresult, config=config) + + for url_entry in discovered_urls: + abx.archivebox.writes.create_outlink_record(src=archiveresult.snapshot.url, dst=url_entry.url, via=archiveresult) + abx.archivebox.writes.create_snapshot(crawl=archiveresult.snapshot.crawl, url_entry=url_entry) + + # abx.archivebox.events.on_crawl_updated(archiveresult.snapshot.crawl) diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 43afd9e8..2addf521 100644 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -2,12 +2,6 @@ __package__ = 'archivebox.crawls' import time -import abx -import abx.archivebox.events -import abx.hookimpl - -from datetime import datetime - from django_stubs_ext.db.models import TypedModelMeta from django.db import models @@ -20,59 +14,10 @@ from django.urls import reverse_lazy from pathlib import Path +from seeds.models import Seed from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats -from ..extractors import EXTRACTOR_CHOICES - - -class Seed(ABIDModel, ModelWithHealthStats): - """ - A fountain that produces URLs (+metadata) each time it's queried e.g. - - file:///data/sources/2024-01-02_11-57-51__cli_add.txt - - file:///data/sources/2024-01-02_11-57-51__web_ui_add.txt - - file:///Users/squash/Library/Application Support/Google/Chrome/Default/Bookmarks - - https://getpocket.com/user/nikisweeting/feed - - https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml - - ... - Each query of a Seed can produce the same list of URLs, or a different list each time. - The list of URLs it returns is used to create a new Crawl and seed it with new pending Snapshots. - - When a crawl is created, a root_snapshot is initially created with a URI set to the Seed URI. - The seed's preferred extractor is executed on that URI, which produces an ArchiveResult containing outlinks. - The outlinks then get turned into new pending Snapshots under the same crawl, - and the cycle repeats until Crawl.max_depth. - - Each consumption of a Seed by an Extractor can produce new urls, as Seeds can point to - stateful remote services, files with contents that change, directories that have new files within, etc. - """ - - abid_prefix = 'src_' - abid_ts_src = 'self.created_at' - abid_uri_src = 'self.uri' - abid_subtype_src = 'self.extractor' - abid_rand_src = 'self.id' - abid_drift_allowed = True - - uri = models.URLField(max_length=255, blank=False, null=False, unique=True) # unique source location where URLs will be loaded from - - extractor = models.CharField(choices=EXTRACTOR_CHOICES, default='auto', max_length=32) # suggested extractor to use to load this URL source - tags_str = models.CharField(max_length=255, null=False, blank=True, default='') # tags to attach to any URLs that come from this source - config = models.JSONField(default=dict) # extra config to put in scope when loading URLs from this source - - created_at = AutoDateTimeField(default=None, null=False, db_index=True) - modified_at = models.DateTimeField(auto_now=True) - created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False) - - @property - def source_type(self): - # e.g. http/https:// - # file:// - # pocketapi:// - # s3:// - # etc.. - return self.uri.split('://')[0].lower() - class CrawlSchedule(ABIDModel, ModelWithHealthStats): """ @@ -107,8 +52,10 @@ class Crawl(ABIDModel, ModelWithHealthStats): A single session of URLs to archive starting from a given Seed and expanding outwards. An "archiving session" so to speak. A new Crawl should be created for each loading from a Seed (because it can produce a different set of URLs every time its loaded). - E.g. every scheduled import from an RSS feed should create a new Crawl. - Every "Add" task triggered from the Web UI or CLI should create a new Crawl. + E.g. every scheduled import from an RSS feed should create a new Crawl, and more loadings from the same seed each create a new Crawl + + Every "Add" task triggered from the Web UI, CLI, or Scheduled Crawl should create a new Crawl with the seed set to a + file URI e.g. file:///sources/_{ui,cli}_add.txt containing the user's input. """ abid_prefix = 'crl_' abid_ts_src = 'self.created_at' @@ -124,13 +71,13 @@ class Crawl(ABIDModel, ModelWithHealthStats): created_at = AutoDateTimeField(default=None, null=False, db_index=True) modified_at = models.DateTimeField(auto_now=True) - seed = models.ForeignKey(Seed, on_delete=models.CASCADE, related_name='crawl_set', null=False, blank=False) + seed = models.ForeignKey(Seed, on_delete=models.PROTECT, related_name='crawl_set', null=False, blank=False) max_depth = models.PositiveSmallIntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(4)]) tags_str = models.CharField(max_length=1024, blank=True, null=False, default='') persona = models.CharField(max_length=32, blank=True, null=False, default='auto') config = models.JSONField(default=dict) - schedule = models.ForeignKey(CrawlSchedule, null=True, blank=True, editable=True) + schedule = models.ForeignKey(CrawlSchedule, on_delete=models.SET_NULL, null=True, blank=True, editable=True) # crawler = models.CharField(choices=CRAWLER_CHOICES, default='breadth_first', max_length=32) # tags = models.ManyToManyField(Tag, blank=True, related_name='crawl_set', through='CrawlTag') @@ -175,102 +122,7 @@ class Outlink(models.Model): unique_together = (('src', 'dst', 'via'),) -def scheduler_runloop(): - # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine()) - while True: - # abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine()) - - scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True) - scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now()) - - for scheduled_crawl in scheduled_crawls_due: - try: - abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl) - except Exception as e: - abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl) - - # abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due) - time.sleep(1) - - -def create_crawl_from_ui_action(urls, extractor, credentials, depth, tags_str, persona, created_by, crawl_config): - if seed_is_remote(urls, extractor, credentials): - # user's seed is a remote source that will provide the urls (e.g. RSS feed URL, Pocket API, etc.) - uri, extractor, credentials = abx.archivebox.effects.check_remote_seed_connection(urls, extractor, credentials, created_by) - else: - # user's seed is some raw text they provided to parse for urls, save it to a file then load the file as a Seed - uri = abx.archivebox.writes.write_raw_urls_to_local_file(urls, extractor, tags_str, created_by) # file:///data/sources/some_import.txt - - seed = abx.archivebox.writes.get_or_create_seed(uri=remote_uri, extractor, credentials, created_by) - # abx.archivebox.events.on_seed_created(seed) - - crawl = abx.archivebox.writes.create_crawl(seed=seed, depth=depth, tags_str=tags_str, persona=persona, created_by=created_by, config=crawl_config, schedule=None) - abx.archivebox.events.on_crawl_created(crawl) - - -@abx.hookimpl.on_crawl_schedule_tick -def create_crawl_from_crawlschedule_if_due(crawl_schedule): - # make sure it's not too early to run this scheduled import (makes this function indepmpotent / safe to call multiple times / every second) - if timezone.now() < crawl_schedule.next_run_at: - # it's not time to run it yet, wait for the next tick - return - else: - # we're going to run it now, bump the next run time so that no one else runs it at the same time as us - abx.archivebox.writes.update_crawl_schedule_next_run_at(crawl_schedule, next_run_at=crawl_schedule.next_run_at + crawl_schedule.interval) - - crawl_to_copy = None - try: - crawl_to_copy = crawl_schedule.crawl_set.first() # alternatively use .last() to copy most recent crawl instead of very first crawl - except Crawl.DoesNotExist: - # there is no template crawl to base the next one off of - # user must add at least one crawl to a schedule that serves as the template for all future repeated crawls - return - - new_crawl = abx.archivebox.writes.create_crawl_copy(crawl_to_copy=crawl_to_copy, schedule=crawl_schedule) - abx.archivebox.events.on_crawl_created(new_crawl) - - -@abx.hookimpl.on_crawl_created -def create_root_snapshot(crawl): - # create a snapshot for the seed URI which kicks off the crawl - # only a single extractor will run on it, which will produce outlinks which get added back to the crawl - root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={ - 'extractors': ( - abx.archivebox.reads.get_extractors_that_produce_outlinks() - if crawl.seed.extractor == 'auto' else - [crawl.seed.extractor] - ), - **crawl.seed.config, - }) - if created: - abx.archivebox.events.on_snapshot_created(root_snapshot) - abx.archivebox.writes.update_crawl_stats(started_at=timezone.now()) - - -@abx.hookimpl.on_snapshot_created -def create_archiveresults_pending_from_snapshot(snapshot, config): - config = get_scope_config( - # defaults=settings.CONFIG_FROM_DEFAULTS, - # configfile=settings.CONFIG_FROM_FILE, - # environment=settings.CONFIG_FROM_ENVIRONMENT, - persona=archiveresult.snapshot.crawl.persona, - seed=archiveresult.snapshot.crawl.seed, - crawl=archiveresult.snapshot.crawl, - snapshot=archiveresult.snapshot, - archiveresult=archiveresult, - # extra_config=extra_config, - ) - - extractors = abx.archivebox.reads.get_extractors_for_snapshot(snapshot, config) - for extractor in extractors: - archiveresult, created = abx.archivebox.writes.get_or_create_archiveresult_pending( - snapshot=snapshot, - extractor=extractor, - status='pending' - ) - if created: - abx.archivebox.events.on_archiveresult_created(archiveresult) @abx.hookimpl.on_archiveresult_created @@ -298,28 +150,6 @@ def exec_archiveresult_extractor_effects(archiveresult): abx.archivebox.events.on_snapshot_updated(archiveresult.snapshot) -@abx.hookimpl.on_archiveresult_updated -def create_snapshots_pending_from_archiveresult_outlinks(archiveresult): - config = get_scope_config(...) - - # check if extractor has finished succesfully, if not, dont bother checking for outlinks - if not archiveresult.status == 'succeeded': - return - - # check if we have already reached the maximum recursion depth - hops_to_here = abx.archivebox.reads.get_outlink_parents(crawl_pk=archiveresult.snapshot.crawl_id, url=archiveresult.url, config=config) - if len(hops_to_here) >= archiveresult.crawl.max_depth +1: - return - - # parse the output to get outlink url_entries - discovered_urls = abx.archivebox.reads.get_archiveresult_discovered_url_entries(archiveresult, config=config) - - for url_entry in discovered_urls: - abx.archivebox.writes.create_outlink_record(src=archiveresult.snapshot.url, dst=url_entry.url, via=archiveresult) - abx.archivebox.writes.create_snapshot(crawl=archiveresult.snapshot.crawl, url_entry=url_entry) - - # abx.archivebox.events.on_crawl_updated(archiveresult.snapshot.crawl) - @abx.hookimpl.reads.get_outlink_parents def get_outlink_parents(url, crawl_pk=None, config=None): scope = Q(dst=url) diff --git a/archivebox/plugins_extractor/chrome/__init__.py b/archivebox/plugins_extractor/chrome/__init__.py index f24d1380..9b254655 100644 --- a/archivebox/plugins_extractor/chrome/__init__.py +++ b/archivebox/plugins_extractor/chrome/__init__.py @@ -44,3 +44,22 @@ def get_BINARIES(): # 'screenshot': SCREENSHOT_EXTRACTOR, # 'dom': DOM_EXTRACTOR, # } + +# Hooks Available: + +# Events: +# on_crawl_schedule_tick +# on_seed_post_save +# on_crawl_post_save +# on_snapshot_post_save +# on_archiveresult_post_save + + +# create_root_snapshot_from_seed +# create_archiveresults_pending_from_snapshot +# create_crawl_from_crawlschedule_if_due +# create_crawl_copy_from_template +# + + +# create_crawl_from_crawlschedule_if_due diff --git a/archivebox/queues/tasks.py b/archivebox/queues/tasks.py index dd22bbd6..acfeab0b 100644 --- a/archivebox/queues/tasks.py +++ b/archivebox/queues/tasks.py @@ -1,5 +1,8 @@ __package__ = 'archivebox.queues' +from functools import wraps +from django.utils import timezone + from django_huey import db_task, task from huey_monitor.models import TaskModel @@ -7,6 +10,38 @@ from huey_monitor.tqdm import ProcessInfo from .supervisor_util import get_or_create_supervisord_process +# @db_task(queue="system_tasks", context=True, schedule=1) +# def scheduler_tick(): +# print('SCHEDULER TICK', timezone.now().isoformat()) +# # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine()) + +# # abx.archivebox.events.on_scheduler_tick_start(timezone.now(), machine=Machine.objects.get_current_machine()) + +# scheduled_crawls = CrawlSchedule.objects.filter(is_enabled=True) +# scheduled_crawls_due = scheduled_crawls.filter(next_run_at__lte=timezone.now()) + +# for scheduled_crawl in scheduled_crawls_due: +# try: +# abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl) +# except Exception as e: +# abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl) + +# # abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due) + +def db_task_with_parent(func): + """Decorator for db_task that sets the parent task for the db_task""" + + @wraps(func) + def wrapper(*args, **kwargs): + task = kwargs.get('task') + parent_task_id = kwargs.get('parent_task_id') + + if task and parent_task_id: + TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id) + + return func(*args, **kwargs) + + return wrapper @db_task(queue="system_tasks", context=True) def bg_add(add_kwargs, task=None, parent_task_id=None):