begin migrating search backends to new plugin system

This commit is contained in:
Nick Sweeting 2024-09-24 02:13:01 -07:00
parent 2d19317e3f
commit c9c163efed
No known key found for this signature in database
11 changed files with 83 additions and 21 deletions

View file

View file

@ -0,0 +1,62 @@
__package__ = 'archivebox.plugins_search.ripgrep'
from typing import List, Dict, ClassVar
# from typing_extensions import Self
from django.conf import settings
# Depends on other PyPI/vendor packages:
from pydantic import InstanceOf, Field
from pydantic_pkgr import BinProvider, BinProviderName, ProviderLookupDict, BinName
# Depends on other Django apps:
from plugantic.base_plugin import BasePlugin
from plugantic.base_configset import BaseConfigSet, ConfigSectionName
from plugantic.base_binary import BaseBinary, env, apt, brew
from plugantic.base_hook import BaseHook
# from plugantic.base_search import BaseSearchBackend
# Depends on Other Plugins:
# from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
###################### Config ##########################
class RipgrepConfig(BaseConfigSet):
section: ClassVar[ConfigSectionName] = 'DEPENDENCY_CONFIG'
RIPGREP_BINARY: str = Field(default='rg')
RIPGREP_CONFIG = RipgrepConfig()
class RipgrepBinary(BaseBinary):
name: BinName = RIPGREP_CONFIG.RIPGREP_BINARY
binproviders_supported: List[InstanceOf[BinProvider]] = [apt, brew, env]
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = {
apt.name: {'packages': lambda: ['ripgrep']},
brew.name: {'packages': lambda: ['ripgrep']},
}
RIPGREP_BINARY = RipgrepBinary()
# TODO:
# class RipgrepSearchBackend(BaseSearchBackend):
# name: str = 'ripgrep'
# RIPGREP_SEARCH_BACKEND = RipgrepSearchBackend()
class RipgrepSearchPlugin(BasePlugin):
app_label: str ='ripgrep'
verbose_name: str = 'Ripgrep'
hooks: List[InstanceOf[BaseHook]] = [
RIPGREP_CONFIG,
RIPGREP_BINARY,
]
PLUGIN = RipgrepSearchPlugin()
PLUGIN.register(settings)
DJANGO_APP = PLUGIN.AppConfig

View file

@ -0,0 +1,45 @@
import re
from subprocess import run, PIPE
from typing import List, Generator
from archivebox.config import ARCHIVE_DIR, RIPGREP_VERSION, SEARCH_BACKEND_TIMEOUT
from archivebox.util import enforce_types
RG_IGNORE_EXTENSIONS = ('css','js','orig','svg')
RG_ADD_TYPE = '--type-add'
RG_IGNORE_ARGUMENTS = f"ignore:*.{{{','.join(RG_IGNORE_EXTENSIONS)}}}"
RG_DEFAULT_ARGUMENTS = "-ilTignore" # Case insensitive(i), matching files results(l)
RG_REGEX_ARGUMENT = '-e'
TIMESTAMP_REGEX = r'\/([\d]+\.[\d]+)\/'
ts_regex = re.compile(TIMESTAMP_REGEX)
@enforce_types
def index(snapshot_id: str, texts: List[str]):
return
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
return
@enforce_types
def search(text: str) -> List[str]:
if not RIPGREP_VERSION:
raise Exception("ripgrep binary not found, install ripgrep to use this search backend")
from core.models import Snapshot
rg_cmd = ['rg', RG_ADD_TYPE, RG_IGNORE_ARGUMENTS, RG_DEFAULT_ARGUMENTS, RG_REGEX_ARGUMENT, text, str(ARCHIVE_DIR)]
rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=SEARCH_BACKEND_TIMEOUT)
file_paths = [p.decode() for p in rg.stdout.splitlines()]
timestamps = set()
for path in file_paths:
ts = ts_regex.findall(path)
if ts:
timestamps.add(ts[0])
snap_ids = [str(id) for id in Snapshot.objects.filter(timestamp__in=timestamps).values_list('pk', flat=True)]
return snap_ids

View file

@ -0,0 +1,44 @@
from typing import List, Generator
from sonic import IngestClient, SearchClient
from archivebox.util import enforce_types
from archivebox.config import SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD, SONIC_BUCKET, SONIC_COLLECTION
MAX_SONIC_TEXT_TOTAL_LENGTH = 100000000 # dont index more than 100 million characters per text
MAX_SONIC_TEXT_CHUNK_LENGTH = 2000 # dont index more than 2000 characters per chunk
MAX_SONIC_ERRORS_BEFORE_ABORT = 5
@enforce_types
def index(snapshot_id: str, texts: List[str]):
error_count = 0
with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
for text in texts:
chunks = (
text[i:i+MAX_SONIC_TEXT_CHUNK_LENGTH]
for i in range(
0,
min(len(text), MAX_SONIC_TEXT_TOTAL_LENGTH),
MAX_SONIC_TEXT_CHUNK_LENGTH,
)
)
try:
for chunk in chunks:
ingestcl.push(SONIC_COLLECTION, SONIC_BUCKET, snapshot_id, str(chunk))
except Exception as err:
print(f'[!] Sonic search backend threw an error while indexing: {err.__class__.__name__} {err}')
error_count += 1
if error_count > MAX_SONIC_ERRORS_BEFORE_ABORT:
raise
@enforce_types
def search(text: str) -> List[str]:
with SearchClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as querycl:
snap_ids = querycl.query(SONIC_COLLECTION, SONIC_BUCKET, text)
return snap_ids
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
for id in snapshot_ids:
ingestcl.flush_object(SONIC_COLLECTION, SONIC_BUCKET, str(id))

View file

@ -0,0 +1,195 @@
import codecs
from typing import List, Generator
import sqlite3
from archivebox.util import enforce_types
from archivebox.config import (
FTS_SEPARATE_DATABASE,
FTS_TOKENIZERS,
FTS_SQLITE_MAX_LENGTH
)
FTS_TABLE = "snapshot_fts"
FTS_ID_TABLE = "snapshot_id_fts"
FTS_COLUMN = "texts"
if FTS_SEPARATE_DATABASE:
database = sqlite3.connect("search.sqlite3")
# Make get_connection callable, because `django.db.connection.cursor()`
# has to be called to get a context manager, but sqlite3.Connection
# is a context manager without being called.
def get_connection():
return database
SQLITE_BIND = "?"
else:
from django.db import connection as database # type: ignore[no-redef, assignment]
get_connection = database.cursor
SQLITE_BIND = "%s"
# Only Python >= 3.11 supports sqlite3.Connection.getlimit(),
# so fall back to the default if the API to get the real value isn't present
try:
limit_id = sqlite3.SQLITE_LIMIT_LENGTH
try:
with database.temporary_connection() as cursor: # type: ignore[attr-defined]
SQLITE_LIMIT_LENGTH = cursor.connection.getlimit(limit_id)
except AttributeError:
SQLITE_LIMIT_LENGTH = database.getlimit(limit_id)
except AttributeError:
SQLITE_LIMIT_LENGTH = FTS_SQLITE_MAX_LENGTH
def _escape_sqlite3(value: str, *, quote: str, errors='strict') -> str:
assert isinstance(quote, str), "quote is not a str"
assert len(quote) == 1, "quote must be a single character"
encodable = value.encode('utf-8', errors).decode('utf-8')
nul_index = encodable.find("\x00")
if nul_index >= 0:
error = UnicodeEncodeError("NUL-terminated utf-8", encodable,
nul_index, nul_index + 1, "NUL not allowed")
error_handler = codecs.lookup_error(errors)
replacement, _ = error_handler(error)
assert isinstance(replacement, str), "handling a UnicodeEncodeError should return a str replacement"
encodable = encodable.replace("\x00", replacement)
return quote + encodable.replace(quote, quote * 2) + quote
def _escape_sqlite3_value(value: str, errors='strict') -> str:
return _escape_sqlite3(value, quote="'", errors=errors)
def _escape_sqlite3_identifier(value: str) -> str:
return _escape_sqlite3(value, quote='"', errors='strict')
@enforce_types
def _create_tables():
table = _escape_sqlite3_identifier(FTS_TABLE)
# Escape as value, because fts5() expects
# string literal column names
column = _escape_sqlite3_value(FTS_COLUMN)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
tokenizers = _escape_sqlite3_value(FTS_TOKENIZERS)
trigger_name = _escape_sqlite3_identifier(f"{FTS_ID_TABLE}_ad")
with get_connection() as cursor:
# Create a contentless-delete FTS5 table that indexes
# but does not store the texts of snapshots
try:
cursor.execute(
f"CREATE VIRTUAL TABLE {table}"
f" USING fts5({column},"
f" tokenize={tokenizers},"
" content='', contentless_delete=1);"
)
except Exception as e:
msg = str(e)
if 'unrecognized option: "contentlessdelete"' in msg:
sqlite_version = getattr(sqlite3, "sqlite_version", "Unknown")
raise RuntimeError(
"SQLite full-text search requires SQLite >= 3.43.0;"
f" the running version is {sqlite_version}"
) from e
else:
raise
# Create a one-to-one mapping between ArchiveBox snapshot_id
# and FTS5 rowid, because the column type of rowid can't be
# customized.
cursor.execute(
f"CREATE TABLE {id_table}("
" rowid INTEGER PRIMARY KEY AUTOINCREMENT,"
" snapshot_id char(32) NOT NULL UNIQUE"
");"
)
# Create a trigger to delete items from the FTS5 index when
# the snapshot_id is deleted from the mapping, to maintain
# consistency and make the `flush()` query simpler.
cursor.execute(
f"CREATE TRIGGER {trigger_name}"
f" AFTER DELETE ON {id_table} BEGIN"
f" DELETE FROM {table} WHERE rowid=old.rowid;"
" END;"
)
def _handle_query_exception(exc: Exception):
message = str(exc)
if message.startswith("no such table:"):
raise RuntimeError(
"SQLite full-text search index has not yet"
" been created; run `archivebox update --index-only`."
)
else:
raise exc
@enforce_types
def index(snapshot_id: str, texts: List[str]):
text = ' '.join(texts)[:SQLITE_LIMIT_LENGTH]
table = _escape_sqlite3_identifier(FTS_TABLE)
column = _escape_sqlite3_identifier(FTS_COLUMN)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
retries = 2
while retries > 0:
retries -= 1
try:
# If there is already an FTS index rowid to snapshot_id mapping,
# then don't insert a new one, silently ignoring the operation.
# {id_table}.rowid is AUTOINCREMENT, so will generate an unused
# rowid for the index if it is an unindexed snapshot_id.
cursor.execute(
f"INSERT OR IGNORE INTO {id_table}(snapshot_id) VALUES({SQLITE_BIND})",
[snapshot_id])
# Fetch the FTS index rowid for the given snapshot_id
id_res = cursor.execute(
f"SELECT rowid FROM {id_table} WHERE snapshot_id = {SQLITE_BIND}",
[snapshot_id])
rowid = id_res.fetchone()[0]
# (Re-)index the content
cursor.execute(
"INSERT OR REPLACE INTO"
f" {table}(rowid, {column}) VALUES ({SQLITE_BIND}, {SQLITE_BIND})",
[rowid, text])
# All statements succeeded; return
return
except Exception as e:
if str(e).startswith("no such table:") and retries > 0:
_create_tables()
else:
raise
raise RuntimeError("Failed to create tables for SQLite FTS5 search")
@enforce_types
def search(text: str) -> List[str]:
table = _escape_sqlite3_identifier(FTS_TABLE)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
try:
res = cursor.execute(
f"SELECT snapshot_id FROM {table}"
f" INNER JOIN {id_table}"
f" ON {id_table}.rowid = {table}.rowid"
f" WHERE {table} MATCH {SQLITE_BIND}",
[text])
except Exception as e:
_handle_query_exception(e)
snap_ids = [row[0] for row in res.fetchall()]
return snap_ids
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
snapshot_ids = list(snapshot_ids) # type: ignore[assignment]
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
try:
cursor.executemany(
f"DELETE FROM {id_table} WHERE snapshot_id={SQLITE_BIND}",
[snapshot_ids])
except Exception as e:
_handle_query_exception(e)