fully migrate all search backends to new plugin system
Some checks are pending
CodeQL / Analyze (python) (push) Waiting to run
Build Debian package / build (push) Waiting to run
Build Docker image / buildx (push) Waiting to run
Build Homebrew package / build (push) Waiting to run
Build GitHub Pages website / build (push) Waiting to run
Build GitHub Pages website / deploy (push) Blocked by required conditions
Run linters / lint (push) Waiting to run
Build Pip package / build (push) Waiting to run
Run tests / python_tests (ubuntu-22.04, 3.11) (push) Waiting to run
Run tests / docker_tests (push) Waiting to run

This commit is contained in:
Nick Sweeting 2024-09-24 03:05:43 -07:00
parent c9c163efed
commit fbfd16e195
No known key found for this signature in database
13 changed files with 495 additions and 302 deletions

View file

@ -381,7 +381,7 @@ def search_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type:
from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
from ..search import query_search_index from ..search import query_search_index
if not SEARCH_BACKEND_CONFIG.SEARCH_BACKEND_ENABLED: if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
stderr() stderr()
stderr( stderr(
'[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', '[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True',

View file

@ -4,12 +4,12 @@ import inspect
from huey.api import TaskWrapper from huey.api import TaskWrapper
from pathlib import Path from pathlib import Path
from typing import List, Literal, ClassVar from typing import Tuple, Literal, ClassVar, get_args
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
HookType = Literal['CONFIG', 'BINPROVIDER', 'BINARY', 'EXTRACTOR', 'REPLAYER', 'CHECK', 'ADMINDATAVIEW', 'QUEUE'] HookType = Literal['CONFIG', 'BINPROVIDER', 'BINARY', 'EXTRACTOR', 'REPLAYER', 'CHECK', 'ADMINDATAVIEW', 'QUEUE', 'SEARCHBACKEND']
hook_type_names: List[HookType] = ['CONFIG', 'BINPROVIDER', 'BINARY', 'EXTRACTOR', 'REPLAYER', 'CHECK', 'ADMINDATAVIEW', 'QUEUE'] hook_type_names: Tuple[HookType] = get_args(HookType)
class BaseHook(BaseModel): class BaseHook(BaseModel):
""" """

View file

@ -0,0 +1,39 @@
__package__ = 'archivebox.plugantic'
from typing import Iterable, List
from benedict import benedict
from pydantic import Field
from .base_hook import BaseHook, HookType
class BaseSearchBackend(BaseHook):
hook_type: HookType = 'SEARCHBACKEND'
name: str = Field() # e.g. 'singlefile'
@staticmethod
def index(snapshot_id: str, texts: List[str]):
return
@staticmethod
def flush(snapshot_ids: Iterable[str]):
return
@staticmethod
def search(text: str) -> List[str]:
raise NotImplementedError("search method must be implemented by subclass")
def register(self, settings, parent_plugin=None):
# self._plugin = parent_plugin # for debugging only, never rely on this!
# Install queue into settings.SEARCH_BACKENDS
settings.SEARCH_BACKENDS = getattr(settings, "SEARCH_BACKENDS", None) or benedict({})
settings.SEARCH_BACKENDS[self.id] = self
# Record installed hook into settings.HOOKS
super().register(settings, parent_plugin=parent_plugin)

View file

@ -1,6 +1,8 @@
__package__ = 'archivebox.plugins_search.ripgrep' __package__ = 'archivebox.plugins_search.ripgrep'
from typing import List, Dict, ClassVar import re
from subprocess import run
from typing import List, Dict, ClassVar, Iterable
# from typing_extensions import Self # from typing_extensions import Self
from django.conf import settings from django.conf import settings
@ -14,10 +16,10 @@ from plugantic.base_plugin import BasePlugin
from plugantic.base_configset import BaseConfigSet, ConfigSectionName from plugantic.base_configset import BaseConfigSet, ConfigSectionName
from plugantic.base_binary import BaseBinary, env, apt, brew from plugantic.base_binary import BaseBinary, env, apt, brew
from plugantic.base_hook import BaseHook from plugantic.base_hook import BaseHook
# from plugantic.base_search import BaseSearchBackend from plugantic.base_searchbackend import BaseSearchBackend
# Depends on Other Plugins: # Depends on Other Plugins:
# from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
###################### Config ########################## ###################### Config ##########################
@ -39,11 +41,59 @@ class RipgrepBinary(BaseBinary):
RIPGREP_BINARY = RipgrepBinary() RIPGREP_BINARY = RipgrepBinary()
# TODO:
# class RipgrepSearchBackend(BaseSearchBackend):
# name: str = 'ripgrep'
# RIPGREP_SEARCH_BACKEND = RipgrepSearchBackend() RG_IGNORE_EXTENSIONS = ('css','js','orig','svg')
RG_ADD_TYPE = '--type-add'
RG_IGNORE_ARGUMENTS = f"ignore:*.{{{','.join(RG_IGNORE_EXTENSIONS)}}}"
RG_DEFAULT_ARGUMENTS = "-ilTignore" # Case insensitive(i), matching files results(l)
RG_REGEX_ARGUMENT = '-e'
TIMESTAMP_REGEX = r'\/([\d]+\.[\d]+)\/'
ts_regex = re.compile(TIMESTAMP_REGEX)
class RipgrepSearchBackend(BaseSearchBackend):
name: str = 'ripgrep'
docs_url: str = 'https://github.com/BurntSushi/ripgrep'
@staticmethod
def index(snapshot_id: str, texts: List[str]):
return
@staticmethod
def flush(snapshot_ids: Iterable[str]):
return
@staticmethod
def search(text: str) -> List[str]:
rg_bin = RIPGREP_BINARY.load()
if not rg_bin.version:
raise Exception("ripgrep binary not found, install ripgrep to use this search backend")
rg_cmd = [
rg_bin.abspath,
RG_ADD_TYPE,
RG_IGNORE_ARGUMENTS,
RG_DEFAULT_ARGUMENTS,
RG_REGEX_ARGUMENT,
text,
str(settings.ARCHIVE_DIR)
]
rg = run(rg_cmd, timeout=SEARCH_BACKEND_CONFIG.SEARCH_BACKEND_TIMEOUT, capture_output=True, text=True)
timestamps = set()
for path in rg.stdout.splitlines():
ts = ts_regex.findall(path)
if ts:
timestamps.add(ts[0])
snap_ids = [str(id) for id in Snapshot.objects.filter(timestamp__in=timestamps).values_list('pk', flat=True)]
return snap_ids
RIPGREP_SEARCH_BACKEND = RipgrepSearchBackend()
class RipgrepSearchPlugin(BasePlugin): class RipgrepSearchPlugin(BasePlugin):
@ -53,6 +103,7 @@ class RipgrepSearchPlugin(BasePlugin):
hooks: List[InstanceOf[BaseHook]] = [ hooks: List[InstanceOf[BaseHook]] = [
RIPGREP_CONFIG, RIPGREP_CONFIG,
RIPGREP_BINARY, RIPGREP_BINARY,
RIPGREP_SEARCH_BACKEND,
] ]

View file

@ -1,45 +0,0 @@
import re
from subprocess import run, PIPE
from typing import List, Generator
from archivebox.config import ARCHIVE_DIR, RIPGREP_VERSION, SEARCH_BACKEND_TIMEOUT
from archivebox.util import enforce_types
RG_IGNORE_EXTENSIONS = ('css','js','orig','svg')
RG_ADD_TYPE = '--type-add'
RG_IGNORE_ARGUMENTS = f"ignore:*.{{{','.join(RG_IGNORE_EXTENSIONS)}}}"
RG_DEFAULT_ARGUMENTS = "-ilTignore" # Case insensitive(i), matching files results(l)
RG_REGEX_ARGUMENT = '-e'
TIMESTAMP_REGEX = r'\/([\d]+\.[\d]+)\/'
ts_regex = re.compile(TIMESTAMP_REGEX)
@enforce_types
def index(snapshot_id: str, texts: List[str]):
return
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
return
@enforce_types
def search(text: str) -> List[str]:
if not RIPGREP_VERSION:
raise Exception("ripgrep binary not found, install ripgrep to use this search backend")
from core.models import Snapshot
rg_cmd = ['rg', RG_ADD_TYPE, RG_IGNORE_ARGUMENTS, RG_DEFAULT_ARGUMENTS, RG_REGEX_ARGUMENT, text, str(ARCHIVE_DIR)]
rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=SEARCH_BACKEND_TIMEOUT)
file_paths = [p.decode() for p in rg.stdout.splitlines()]
timestamps = set()
for path in file_paths:
ts = ts_regex.findall(path)
if ts:
timestamps.add(ts[0])
snap_ids = [str(id) for id in Snapshot.objects.filter(timestamp__in=timestamps).values_list('pk', flat=True)]
return snap_ids

View file

@ -0,0 +1,132 @@
__package__ = 'archivebox.plugins_search.sonic'
import sys
from typing import List, Dict, ClassVar, Generator, cast
from django.conf import settings
# Depends on other PyPI/vendor packages:
from pydantic import InstanceOf, Field, model_validator
from pydantic_pkgr import BinProvider, BinProviderName, ProviderLookupDict, BinName
# Depends on other Django apps:
from plugantic.base_plugin import BasePlugin
from plugantic.base_configset import BaseConfigSet, ConfigSectionName
from plugantic.base_binary import BaseBinary, env, brew
from plugantic.base_hook import BaseHook
from plugantic.base_searchbackend import BaseSearchBackend
# Depends on Other Plugins:
from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
SONIC_LIB = None
try:
import sonic
SONIC_LIB = sonic
except ImportError:
SONIC_LIB = None
###################### Config ##########################
class SonicConfig(BaseConfigSet):
section: ClassVar[ConfigSectionName] = 'DEPENDENCY_CONFIG'
SONIC_BINARY: str = Field(default='sonic')
SONIC_HOST: str = Field(default='localhost', alias='SEARCH_BACKEND_HOST_NAME')
SONIC_PORT: int = Field(default=1491, alias='SEARCH_BACKEND_PORT')
SONIC_PASSWORD: str = Field(default='SecretPassword', alias='SEARCH_BACKEND_PASSWORD')
SONIC_COLLECTION: str = Field(default='archivebox')
SONIC_BUCKET: str = Field(default='archivebox')
@model_validator(mode='after')
def validate_sonic_port(self):
if SEARCH_BACKEND_CONFIG.SEARCH_BACKEND_ENGINE == 'sonic':
if SONIC_LIB is None:
sys.stderr.write('[!] Sonic search backend is enabled but not installed. Install Sonic to use the Sonic search backend.\n')
return self
SONIC_CONFIG = SonicConfig()
class SonicBinary(BaseBinary):
name: BinName = SONIC_CONFIG.SONIC_BINARY
binproviders_supported: List[InstanceOf[BinProvider]] = [brew, env] # TODO: add cargo
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = {
brew.name: {'packages': lambda: ['sonic']},
# cargo.name: {'packages': lambda: ['sonic-server']}, # TODO: add cargo
}
# def on_get_version(self):
# with sonic.IngestClient(SONIC_CONFIG.SONIC_HOST, str(SONIC_CONFIG.SONIC_PORT), SONIC_CONFIG.SONIC_PASSWORD) as ingestcl:
# return SemVer.parse(str(ingestcl.protocol))
SONIC_BINARY = SonicBinary()
MAX_SONIC_TEXT_TOTAL_LENGTH = 100000000 # dont index more than 100 million characters per text
MAX_SONIC_TEXT_CHUNK_LENGTH = 2000 # dont index more than 2000 characters per chunk
MAX_SONIC_ERRORS_BEFORE_ABORT = 5
class SonicSearchBackend(BaseSearchBackend):
name: str = 'sonic'
docs_url: str = 'https://github.com/valeriansaliou/sonic'
@staticmethod
def index(snapshot_id: str, texts: List[str]):
error_count = 0
with sonic.IngestClient(SONIC_CONFIG.SONIC_HOST, str(SONIC_CONFIG.SONIC_PORT), SONIC_CONFIG.SONIC_PASSWORD) as ingestcl:
for text in texts:
chunks = (
text[i:i+MAX_SONIC_TEXT_CHUNK_LENGTH]
for i in range(
0,
min(len(text), MAX_SONIC_TEXT_TOTAL_LENGTH),
MAX_SONIC_TEXT_CHUNK_LENGTH,
)
)
try:
for chunk in chunks:
ingestcl.push(SONIC_CONFIG.SONIC_COLLECTION, SONIC_CONFIG.SONIC_BUCKET, snapshot_id, str(chunk))
except Exception as err:
print(f'[!] Sonic search backend threw an error while indexing: {err.__class__.__name__} {err}')
error_count += 1
if error_count > MAX_SONIC_ERRORS_BEFORE_ABORT:
raise
@staticmethod
def flush(snapshot_ids: Generator[str, None, None]):
with sonic.IngestClient(SONIC_CONFIG.SONIC_HOST, str(SONIC_CONFIG.SONIC_PORT), SONIC_CONFIG.SONIC_PASSWORD) as ingestcl:
for id in snapshot_ids:
ingestcl.flush_object(SONIC_CONFIG.SONIC_COLLECTION, SONIC_CONFIG.SONIC_BUCKET, str(id))
@staticmethod
def search(text: str) -> List[str]:
with sonic.SearchClient(SONIC_CONFIG.SONIC_HOST, SONIC_CONFIG.SONIC_PORT, SONIC_CONFIG.SONIC_PASSWORD) as querycl:
snap_ids = cast(List[str], querycl.query(SONIC_CONFIG.SONIC_COLLECTION, SONIC_CONFIG.SONIC_BUCKET, text))
return [str(id) for id in snap_ids]
SONIC_SEARCH_BACKEND = SonicSearchBackend()
class SonicSearchPlugin(BasePlugin):
app_label: str ='sonic'
verbose_name: str = 'Sonic'
hooks: List[InstanceOf[BaseHook]] = [
SONIC_CONFIG,
SONIC_BINARY,
SONIC_SEARCH_BACKEND,
]
PLUGIN = SonicSearchPlugin()
PLUGIN.register(settings)
DJANGO_APP = PLUGIN.AppConfig

View file

@ -1,44 +0,0 @@
from typing import List, Generator
from sonic import IngestClient, SearchClient
from archivebox.util import enforce_types
from archivebox.config import SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD, SONIC_BUCKET, SONIC_COLLECTION
MAX_SONIC_TEXT_TOTAL_LENGTH = 100000000 # dont index more than 100 million characters per text
MAX_SONIC_TEXT_CHUNK_LENGTH = 2000 # dont index more than 2000 characters per chunk
MAX_SONIC_ERRORS_BEFORE_ABORT = 5
@enforce_types
def index(snapshot_id: str, texts: List[str]):
error_count = 0
with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
for text in texts:
chunks = (
text[i:i+MAX_SONIC_TEXT_CHUNK_LENGTH]
for i in range(
0,
min(len(text), MAX_SONIC_TEXT_TOTAL_LENGTH),
MAX_SONIC_TEXT_CHUNK_LENGTH,
)
)
try:
for chunk in chunks:
ingestcl.push(SONIC_COLLECTION, SONIC_BUCKET, snapshot_id, str(chunk))
except Exception as err:
print(f'[!] Sonic search backend threw an error while indexing: {err.__class__.__name__} {err}')
error_count += 1
if error_count > MAX_SONIC_ERRORS_BEFORE_ABORT:
raise
@enforce_types
def search(text: str) -> List[str]:
with SearchClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as querycl:
snap_ids = querycl.query(SONIC_COLLECTION, SONIC_BUCKET, text)
return snap_ids
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
with IngestClient(SEARCH_BACKEND_HOST_NAME, SEARCH_BACKEND_PORT, SEARCH_BACKEND_PASSWORD) as ingestcl:
for id in snapshot_ids:
ingestcl.flush_object(SONIC_COLLECTION, SONIC_BUCKET, str(id))

View file

@ -0,0 +1,257 @@
__package__ = 'archivebox.plugins_search.sqlite'
import sqlite3
import codecs
from typing import List, ClassVar, Generator, Callable
from django.conf import settings
from django.db import connection as database
# Depends on other PyPI/vendor packages:
from pydantic import InstanceOf, Field, model_validator
# Depends on other Django apps:
from plugantic.base_plugin import BasePlugin
from plugantic.base_configset import BaseConfigSet, ConfigSectionName
from plugantic.base_hook import BaseHook
from plugantic.base_searchbackend import BaseSearchBackend
# Depends on Other Plugins:
# from plugins_sys.config.apps import SEARCH_BACKEND_CONFIG
###################### Config ##########################
class SqliteftsConfig(BaseConfigSet):
section: ClassVar[ConfigSectionName] = 'DEPENDENCY_CONFIG'
SQLITEFTS_SEPARATE_DATABASE: bool = Field(default=True, alias='FTS_SEPARATE_DATABASE')
SQLITEFTS_TOKENIZERS: str = Field(default='porter unicode61 remove_diacritics 2', alias='FTS_TOKENIZERS')
SQLITEFTS_MAX_LENGTH: int = Field(default=int(1e9), alias='FTS_SQLITE_MAX_LENGTH')
SQLITEFTS_DB: str = Field(default='search.sqlite3')
SQLITEFTS_TABLE: str = Field(default='snapshot_fts')
SQLITEFTS_ID_TABLE: str = Field(default='snapshot_id_fts')
SQLITEFTS_COLUMN: str = Field(default='texts')
@model_validator(mode='after')
def validate_fts_separate_database(self):
if self.SQLITEFTS_SEPARATE_DATABASE:
assert self.SQLITEFTS_DB, "SQLITEFTS_DB must be set if SQLITEFTS_SEPARATE_DATABASE is True"
return self
@property
def get_connection(self) -> Callable[[], sqlite3.Connection]:
# Make get_connection callable, because `django.db.connection.cursor()`
# has to be called to get a context manager, but sqlite3.Connection
# is a context manager without being called.
if self.SQLITEFTS_SEPARATE_DATABASE:
return lambda: sqlite3.connect(self.SQLITEFTS_DB)
else:
return database.cursor
@property
def SQLITE_BIND(self) -> str:
if self.SQLITEFTS_SEPARATE_DATABASE:
return "?"
else:
return "%s"
@property
def SQLITE_LIMIT_LENGTH(self) -> int:
# Only Python >= 3.11 supports sqlite3.Connection.getlimit(),
# so fall back to the default if the API to get the real value isn't present
try:
limit_id = sqlite3.SQLITE_LIMIT_LENGTH
try:
with database.temporary_connection() as cursor: # type: ignore[attr-defined]
return cursor.connection.getlimit(limit_id)
except AttributeError:
return database.getlimit(limit_id)
except AttributeError:
return self.SQLITEFTS_MAX_LENGTH
SQLITEFTS_CONFIG = SqliteftsConfig()
def _escape_sqlite3(value: str, *, quote: str, errors='strict') -> str:
assert isinstance(quote, str), "quote is not a str"
assert len(quote) == 1, "quote must be a single character"
encodable = value.encode('utf-8', errors).decode('utf-8')
nul_index = encodable.find("\x00")
if nul_index >= 0:
error = UnicodeEncodeError("NUL-terminated utf-8", encodable,
nul_index, nul_index + 1, "NUL not allowed")
error_handler = codecs.lookup_error(errors)
replacement, _ = error_handler(error)
assert isinstance(replacement, str), "handling a UnicodeEncodeError should return a str replacement"
encodable = encodable.replace("\x00", replacement)
return quote + encodable.replace(quote, quote * 2) + quote
def _escape_sqlite3_value(value: str, errors='strict') -> str:
return _escape_sqlite3(value, quote="'", errors=errors)
def _escape_sqlite3_identifier(value: str) -> str:
return _escape_sqlite3(value, quote='"', errors='strict')
def _create_tables():
table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_TABLE)
# Escape as value, because fts5() expects
# string literal column names
column = _escape_sqlite3_value(SQLITEFTS_CONFIG.SQLITEFTS_COLUMN)
id_table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_ID_TABLE)
tokenizers = _escape_sqlite3_value(SQLITEFTS_CONFIG.SQLITEFTS_TOKENIZERS)
trigger_name = _escape_sqlite3_identifier(f"{SQLITEFTS_CONFIG.SQLITEFTS_ID_TABLE}_ad")
with SQLITEFTS_CONFIG.get_connection() as cursor:
# Create a contentless-delete FTS5 table that indexes
# but does not store the texts of snapshots
try:
cursor.execute(
f"CREATE VIRTUAL TABLE {table}"
f" USING fts5({column},"
f" tokenize={tokenizers},"
" content='', contentless_delete=1);"
)
except Exception as e:
msg = str(e)
if 'unrecognized option: "contentlessdelete"' in msg:
sqlite_version = getattr(sqlite3, "sqlite_version", "Unknown")
raise RuntimeError(
"SQLite full-text search requires SQLite >= 3.43.0;"
f" the running version is {sqlite_version}"
) from e
else:
raise
# Create a one-to-one mapping between ArchiveBox snapshot_id
# and FTS5 rowid, because the column type of rowid can't be
# customized.
cursor.execute(
f"CREATE TABLE {id_table}("
" rowid INTEGER PRIMARY KEY AUTOINCREMENT,"
" snapshot_id char(32) NOT NULL UNIQUE"
");"
)
# Create a trigger to delete items from the FTS5 index when
# the snapshot_id is deleted from the mapping, to maintain
# consistency and make the `flush()` query simpler.
cursor.execute(
f"CREATE TRIGGER {trigger_name}"
f" AFTER DELETE ON {id_table} BEGIN"
f" DELETE FROM {table} WHERE rowid=old.rowid;"
" END;"
)
def _handle_query_exception(exc: Exception):
message = str(exc)
if message.startswith("no such table:"):
raise RuntimeError(
"SQLite full-text search index has not yet"
" been created; run `archivebox update --index-only`."
)
else:
raise exc
class SqliteftsSearchBackend(BaseSearchBackend):
name: str = 'sqlite'
docs_url: str = 'https://www.sqlite.org/fts5.html'
@staticmethod
def index(snapshot_id: str, texts: List[str]):
text = ' '.join(texts)[:SQLITEFTS_CONFIG.SQLITE_LIMIT_LENGTH]
table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_TABLE)
column = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_COLUMN)
id_table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_ID_TABLE)
with SQLITEFTS_CONFIG.get_connection() as cursor:
retries = 2
while retries > 0:
retries -= 1
try:
# If there is already an FTS index rowid to snapshot_id mapping,
# then don't insert a new one, silently ignoring the operation.
# {id_table}.rowid is AUTOINCREMENT, so will generate an unused
# rowid for the index if it is an unindexed snapshot_id.
cursor.execute(
f"INSERT OR IGNORE INTO {id_table}(snapshot_id) VALUES({SQLITEFTS_CONFIG.SQLITE_BIND})",
[snapshot_id])
# Fetch the FTS index rowid for the given snapshot_id
id_res = cursor.execute(
f"SELECT rowid FROM {id_table} WHERE snapshot_id = {SQLITEFTS_CONFIG.SQLITE_BIND}",
[snapshot_id])
rowid = id_res.fetchone()[0]
# (Re-)index the content
cursor.execute(
"INSERT OR REPLACE INTO"
f" {table}(rowid, {column}) VALUES ({SQLITEFTS_CONFIG.SQLITE_BIND}, {SQLITEFTS_CONFIG.SQLITE_BIND})",
[rowid, text])
# All statements succeeded; return
return
except Exception as e:
if str(e).startswith("no such table:") and retries > 0:
_create_tables()
else:
raise
raise RuntimeError("Failed to create tables for SQLite FTS5 search")
@staticmethod
def search(text: str) -> List[str]:
table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_TABLE)
id_table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_ID_TABLE)
with SQLITEFTS_CONFIG.get_connection() as cursor:
try:
res = cursor.execute(
f"SELECT snapshot_id FROM {table}"
f" INNER JOIN {id_table}"
f" ON {id_table}.rowid = {table}.rowid"
f" WHERE {table} MATCH {SQLITEFTS_CONFIG.SQLITE_BIND}",
[text])
except Exception as e:
_handle_query_exception(e)
snap_ids = [row[0] for row in res.fetchall()]
return snap_ids
@staticmethod
def flush(snapshot_ids: Generator[str, None, None]):
snapshot_ids = list(snapshot_ids) # type: ignore[assignment]
id_table = _escape_sqlite3_identifier(SQLITEFTS_CONFIG.SQLITEFTS_ID_TABLE)
with SQLITEFTS_CONFIG.get_connection() as cursor:
try:
cursor.executemany(
f"DELETE FROM {id_table} WHERE snapshot_id={SQLITEFTS_CONFIG.SQLITE_BIND}",
[snapshot_ids])
except Exception as e:
_handle_query_exception(e)
SQLITEFTS_SEARCH_BACKEND = SqliteftsSearchBackend()
class SqliteftsSearchPlugin(BasePlugin):
app_label: str ='sqlitefts'
verbose_name: str = 'Sqlitefts'
hooks: List[InstanceOf[BaseHook]] = [
SQLITEFTS_CONFIG,
SQLITEFTS_SEARCH_BACKEND,
]
PLUGIN = SqliteftsSearchPlugin()
PLUGIN.register(settings)
DJANGO_APP = PLUGIN.AppConfig

View file

@ -1,195 +0,0 @@
import codecs
from typing import List, Generator
import sqlite3
from archivebox.util import enforce_types
from archivebox.config import (
FTS_SEPARATE_DATABASE,
FTS_TOKENIZERS,
FTS_SQLITE_MAX_LENGTH
)
FTS_TABLE = "snapshot_fts"
FTS_ID_TABLE = "snapshot_id_fts"
FTS_COLUMN = "texts"
if FTS_SEPARATE_DATABASE:
database = sqlite3.connect("search.sqlite3")
# Make get_connection callable, because `django.db.connection.cursor()`
# has to be called to get a context manager, but sqlite3.Connection
# is a context manager without being called.
def get_connection():
return database
SQLITE_BIND = "?"
else:
from django.db import connection as database # type: ignore[no-redef, assignment]
get_connection = database.cursor
SQLITE_BIND = "%s"
# Only Python >= 3.11 supports sqlite3.Connection.getlimit(),
# so fall back to the default if the API to get the real value isn't present
try:
limit_id = sqlite3.SQLITE_LIMIT_LENGTH
try:
with database.temporary_connection() as cursor: # type: ignore[attr-defined]
SQLITE_LIMIT_LENGTH = cursor.connection.getlimit(limit_id)
except AttributeError:
SQLITE_LIMIT_LENGTH = database.getlimit(limit_id)
except AttributeError:
SQLITE_LIMIT_LENGTH = FTS_SQLITE_MAX_LENGTH
def _escape_sqlite3(value: str, *, quote: str, errors='strict') -> str:
assert isinstance(quote, str), "quote is not a str"
assert len(quote) == 1, "quote must be a single character"
encodable = value.encode('utf-8', errors).decode('utf-8')
nul_index = encodable.find("\x00")
if nul_index >= 0:
error = UnicodeEncodeError("NUL-terminated utf-8", encodable,
nul_index, nul_index + 1, "NUL not allowed")
error_handler = codecs.lookup_error(errors)
replacement, _ = error_handler(error)
assert isinstance(replacement, str), "handling a UnicodeEncodeError should return a str replacement"
encodable = encodable.replace("\x00", replacement)
return quote + encodable.replace(quote, quote * 2) + quote
def _escape_sqlite3_value(value: str, errors='strict') -> str:
return _escape_sqlite3(value, quote="'", errors=errors)
def _escape_sqlite3_identifier(value: str) -> str:
return _escape_sqlite3(value, quote='"', errors='strict')
@enforce_types
def _create_tables():
table = _escape_sqlite3_identifier(FTS_TABLE)
# Escape as value, because fts5() expects
# string literal column names
column = _escape_sqlite3_value(FTS_COLUMN)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
tokenizers = _escape_sqlite3_value(FTS_TOKENIZERS)
trigger_name = _escape_sqlite3_identifier(f"{FTS_ID_TABLE}_ad")
with get_connection() as cursor:
# Create a contentless-delete FTS5 table that indexes
# but does not store the texts of snapshots
try:
cursor.execute(
f"CREATE VIRTUAL TABLE {table}"
f" USING fts5({column},"
f" tokenize={tokenizers},"
" content='', contentless_delete=1);"
)
except Exception as e:
msg = str(e)
if 'unrecognized option: "contentlessdelete"' in msg:
sqlite_version = getattr(sqlite3, "sqlite_version", "Unknown")
raise RuntimeError(
"SQLite full-text search requires SQLite >= 3.43.0;"
f" the running version is {sqlite_version}"
) from e
else:
raise
# Create a one-to-one mapping between ArchiveBox snapshot_id
# and FTS5 rowid, because the column type of rowid can't be
# customized.
cursor.execute(
f"CREATE TABLE {id_table}("
" rowid INTEGER PRIMARY KEY AUTOINCREMENT,"
" snapshot_id char(32) NOT NULL UNIQUE"
");"
)
# Create a trigger to delete items from the FTS5 index when
# the snapshot_id is deleted from the mapping, to maintain
# consistency and make the `flush()` query simpler.
cursor.execute(
f"CREATE TRIGGER {trigger_name}"
f" AFTER DELETE ON {id_table} BEGIN"
f" DELETE FROM {table} WHERE rowid=old.rowid;"
" END;"
)
def _handle_query_exception(exc: Exception):
message = str(exc)
if message.startswith("no such table:"):
raise RuntimeError(
"SQLite full-text search index has not yet"
" been created; run `archivebox update --index-only`."
)
else:
raise exc
@enforce_types
def index(snapshot_id: str, texts: List[str]):
text = ' '.join(texts)[:SQLITE_LIMIT_LENGTH]
table = _escape_sqlite3_identifier(FTS_TABLE)
column = _escape_sqlite3_identifier(FTS_COLUMN)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
retries = 2
while retries > 0:
retries -= 1
try:
# If there is already an FTS index rowid to snapshot_id mapping,
# then don't insert a new one, silently ignoring the operation.
# {id_table}.rowid is AUTOINCREMENT, so will generate an unused
# rowid for the index if it is an unindexed snapshot_id.
cursor.execute(
f"INSERT OR IGNORE INTO {id_table}(snapshot_id) VALUES({SQLITE_BIND})",
[snapshot_id])
# Fetch the FTS index rowid for the given snapshot_id
id_res = cursor.execute(
f"SELECT rowid FROM {id_table} WHERE snapshot_id = {SQLITE_BIND}",
[snapshot_id])
rowid = id_res.fetchone()[0]
# (Re-)index the content
cursor.execute(
"INSERT OR REPLACE INTO"
f" {table}(rowid, {column}) VALUES ({SQLITE_BIND}, {SQLITE_BIND})",
[rowid, text])
# All statements succeeded; return
return
except Exception as e:
if str(e).startswith("no such table:") and retries > 0:
_create_tables()
else:
raise
raise RuntimeError("Failed to create tables for SQLite FTS5 search")
@enforce_types
def search(text: str) -> List[str]:
table = _escape_sqlite3_identifier(FTS_TABLE)
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
try:
res = cursor.execute(
f"SELECT snapshot_id FROM {table}"
f" INNER JOIN {id_table}"
f" ON {id_table}.rowid = {table}.rowid"
f" WHERE {table} MATCH {SQLITE_BIND}",
[text])
except Exception as e:
_handle_query_exception(e)
snap_ids = [row[0] for row in res.fetchall()]
return snap_ids
@enforce_types
def flush(snapshot_ids: Generator[str, None, None]):
snapshot_ids = list(snapshot_ids) # type: ignore[assignment]
id_table = _escape_sqlite3_identifier(FTS_ID_TABLE)
with get_connection() as cursor:
try:
cursor.executemany(
f"DELETE FROM {id_table} WHERE snapshot_id={SQLITE_BIND}",
[snapshot_ids])
except Exception as e:
_handle_query_exception(e)

View file

@ -119,6 +119,7 @@ class SearchBackendConfig(BaseConfigSet):
SEARCH_BACKEND_PORT: int = Field(default=1491) SEARCH_BACKEND_PORT: int = Field(default=1491)
SEARCH_BACKEND_PASSWORD: str = Field(default='SecretPassword') SEARCH_BACKEND_PASSWORD: str = Field(default='SecretPassword')
SEARCH_PROCESS_HTML: bool = Field(default=True) SEARCH_PROCESS_HTML: bool = Field(default=True)
SEARCH_BACKEND_TIMEOUT: int = Field(default=10)
SEARCH_BACKEND_CONFIG = SearchBackendConfig() SEARCH_BACKEND_CONFIG = SearchBackendConfig()

View file

@ -1,6 +1,5 @@
from typing import List, Union from typing import List, Union
from pathlib import Path from pathlib import Path
from importlib import import_module
from django.db.models import QuerySet from django.db.models import QuerySet
from django.conf import settings from django.conf import settings
@ -15,12 +14,10 @@ from .utils import get_indexable_content, log_index_started
def import_backend(): def import_backend():
backend_string = f'plugins_search.{settings.CONFIGS.SearchBackendConfig.SEARCH_BACKEND_ENGINE}.{settings.CONFIGS.SearchBackendConfig.SEARCH_BACKEND_ENGINE}' for backend in settings.SEARCH_BACKENDS:
try: if backend.name == settings.CONFIGS.SearchBackendConfig.SEARCH_BACKEND_ENGINE:
backend = import_module(backend_string) return backend
except Exception as err: raise Exception(f'Could not load {settings.CONFIGS.SearchBackendConfig.SEARCH_BACKEND_ENGINE} as search backend')
raise Exception("Could not load '%s' as a backend: %s" % (backend_string, err))
return backend
@enforce_types @enforce_types
def write_search_index(link: Link, texts: Union[List[str], None]=None, out_dir: Path=settings.DATA_DIR, skip_text_index: bool=False) -> None: def write_search_index(link: Link, texts: Union[List[str], None]=None, out_dir: Path=settings.DATA_DIR, skip_text_index: bool=False) -> None: