move abx plugins inside vendor dir
Some checks are pending
Build Debian package / build (push) Waiting to run
Build Docker image / buildx (push) Waiting to run
Build Homebrew package / build (push) Waiting to run
Run linters / lint (push) Waiting to run
Build Pip package / build (push) Waiting to run
Run tests / python_tests (ubuntu-22.04, 3.11) (push) Waiting to run
Run tests / docker_tests (push) Waiting to run

This commit is contained in:
Nick Sweeting 2024-10-28 04:07:35 -07:00
parent 5d9a32c364
commit b3c1cb716e
No known key found for this signature in database
242 changed files with 2153 additions and 2700 deletions

View file

@ -27,43 +27,29 @@ from ..logging_util import (
log_archive_method_finished,
)
from .title import should_save_title, save_title
from .favicon import should_save_favicon, save_favicon
from .wget import should_save_wget, save_wget
from .singlefile import should_save_singlefile, save_singlefile
from .readability import should_save_readability, save_readability
from .mercury import should_save_mercury, save_mercury
from .htmltotext import should_save_htmltotext, save_htmltotext
from .pdf import should_save_pdf, save_pdf
from .screenshot import should_save_screenshot, save_screenshot
from .dom import should_save_dom, save_dom
from .git import should_save_git, save_git
from .media import should_save_media, save_media
from .archive_org import should_save_archive_dot_org, save_archive_dot_org
from .headers import should_save_headers, save_headers
ShouldSaveFunction = Callable[[Link, Optional[Path], Optional[bool]], bool]
SaveFunction = Callable[[Link, Optional[Path], int], ArchiveResult]
ArchiveMethodEntry = tuple[str, ShouldSaveFunction, SaveFunction]
def get_default_archive_methods() -> List[ArchiveMethodEntry]:
# TODO: move to abx.pm.hook.get_EXTRACTORS()
return [
('favicon', should_save_favicon, save_favicon),
('headers', should_save_headers, save_headers),
('singlefile', should_save_singlefile, save_singlefile),
('pdf', should_save_pdf, save_pdf),
('screenshot', should_save_screenshot, save_screenshot),
('dom', should_save_dom, save_dom),
('wget', should_save_wget, save_wget),
# keep title, readability, and htmltotext below wget and singlefile, as they depend on them
('title', should_save_title, save_title),
('readability', should_save_readability, save_readability),
('mercury', should_save_mercury, save_mercury),
('htmltotext', should_save_htmltotext, save_htmltotext),
('git', should_save_git, save_git),
('media', should_save_media, save_media),
('archive_org', should_save_archive_dot_org, save_archive_dot_org),
# ('favicon', should_save_favicon, save_favicon),
# ('headers', should_save_headers, save_headers),
# ('singlefile', should_save_singlefile, save_singlefile),
# ('pdf', should_save_pdf, save_pdf),
# ('screenshot', should_save_screenshot, save_screenshot),
# ('dom', should_save_dom, save_dom),
# ('wget', should_save_wget, save_wget),
# # keep title, readability, and htmltotext below wget and singlefile, as they depend on them
# ('title', should_save_title, save_title),
# ('readability', should_save_readability, save_readability),
# ('mercury', should_save_mercury, save_mercury),
# ('htmltotext', should_save_htmltotext, save_htmltotext),
# ('git', should_save_git, save_git),
# ('media', should_save_media, save_media),
# ('archive_org', should_save_archive_dot_org, save_archive_dot_org),
]
ARCHIVE_METHODS_INDEXING_PRECEDENCE = [

View file

@ -1,115 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import enforce_types, is_static_file, dedupe
from archivebox.plugins_extractor.archivedotorg.config import ARCHIVEDOTORG_CONFIG
from archivebox.plugins_extractor.curl.config import CURL_CONFIG
from archivebox.plugins_extractor.curl.binaries import CURL_BINARY
from ..logging_util import TimedProgress
def get_output_path():
return 'archive.org.txt'
@enforce_types
def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
# if open(path, 'r', encoding='utf-8').read().strip() != 'None':
return False
return ARCHIVEDOTORG_CONFIG.SAVE_ARCHIVE_DOT_ORG
@enforce_types
def save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, timeout: int=CURL_CONFIG.CURL_TIMEOUT) -> ArchiveResult:
"""submit site to archive.org for archiving via their service, save returned archive url"""
curl_binary = CURL_BINARY.load()
assert curl_binary.abspath and curl_binary.version
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
archive_org_url = None
submit_url = 'https://web.archive.org/save/{}'.format(link.url)
# later options take precedence
options = [
*CURL_CONFIG.CURL_ARGS,
*CURL_CONFIG.CURL_EXTRA_ARGS,
'--head',
'--max-time', str(timeout),
*(['--user-agent', '{}'.format(CURL_CONFIG.CURL_USER_AGENT)] if CURL_CONFIG.CURL_USER_AGENT else []),
*([] if CURL_CONFIG.CURL_CHECK_SSL_VALIDITY else ['--insecure']),
]
cmd = [
str(curl_binary.abspath),
*dedupe(options),
submit_url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout, text=True)
content_location, errors = parse_archive_dot_org_response(result.stdout)
if content_location:
archive_org_url = content_location[0]
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
archive_org_url = None
# raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
elif errors:
raise ArchiveError(', '.join(errors))
else:
raise ArchiveError('Failed to find "content-location" URL header in Archive.org response.')
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
if output and not isinstance(output, Exception):
# instead of writing None when archive.org rejects the url write the
# url to resubmit it to archive.org. This is so when the user visits
# the URL in person, it will attempt to re-archive it, and it'll show the
# nicer error message explaining why the url was rejected if it fails.
archive_org_url = archive_org_url or submit_url
with open(str(out_dir / output), 'w', encoding='utf-8') as f:
f.write(archive_org_url)
chmod_file(str(out_dir / output), cwd=str(out_dir))
output = archive_org_url
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(curl_binary.version),
output=output,
status=status,
**timer.stats,
)
@enforce_types
def parse_archive_dot_org_response(response: str) -> Tuple[List[str], List[str]]:
# Parse archive.org response headers
headers: Dict[str, List[str]] = defaultdict(list)
# lowercase all the header names and store in dict
for header in response.splitlines():
if ':' not in header or not header.strip():
continue
name, val = header.split(':', 1)
headers[name.lower().strip()].append(val.strip())
# Get successful archive url in "content-location" header or any errors
content_location = headers.get('content-location', headers['location'])
errors = headers['x-archive-wayback-runtime-error']
return content_location, errors

View file

@ -1,76 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from archivebox.misc.system import run, chmod_file, atomic_write
from archivebox.misc.util import (
enforce_types,
is_static_file,
)
from ..logging_util import TimedProgress
from plugins_extractor.chrome.config import CHROME_CONFIG
from plugins_extractor.chrome.binaries import CHROME_BINARY
def get_output_path():
return 'output.html'
@enforce_types
def should_save_dom(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
if (out_dir / get_output_path()).stat().st_size > 1:
return False
return CHROME_CONFIG.SAVE_DOM
@enforce_types
def save_dom(link: Link, out_dir: Optional[Path]=None, timeout: int=60) -> ArchiveResult:
"""print HTML of site to file using chrome --dump-html"""
CHROME_BIN = CHROME_BINARY.load()
assert CHROME_BIN.abspath and CHROME_BIN.version
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
output_path = out_dir / output
cmd = [
str(CHROME_BIN.abspath),
*CHROME_CONFIG.chrome_args(),
'--dump-dom',
link.url
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout, text=True)
atomic_write(output_path, result.stdout)
if result.returncode:
hints = result.stderr
raise ArchiveError('Failed to save DOM', hints)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
CHROME_BINARY.chrome_cleanup_lockfile()
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(CHROME_BIN.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,71 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from archivebox.misc.system import chmod_file, run
from archivebox.misc.util import enforce_types, domain, dedupe
from archivebox.plugins_extractor.favicon.config import FAVICON_CONFIG
from archivebox.plugins_extractor.curl.config import CURL_CONFIG
from archivebox.plugins_extractor.curl.binaries import CURL_BINARY
from ..index.schema import Link, ArchiveResult, ArchiveOutput
from ..logging_util import TimedProgress
@enforce_types
def should_save_favicon(link: Link, out_dir: str | Path | None=None, overwrite: bool=False) -> bool:
assert link.link_dir
out_dir = Path(out_dir or link.link_dir)
if not overwrite and (out_dir / 'favicon.ico').exists():
return False
return FAVICON_CONFIG.SAVE_FAVICON
@enforce_types
def get_output_path():
return 'favicon.ico'
@enforce_types
def save_favicon(link: Link, out_dir: str | Path | None=None, timeout: int=CURL_CONFIG.CURL_TIMEOUT) -> ArchiveResult:
"""download site favicon from google's favicon api"""
curl_binary = CURL_BINARY.load()
assert curl_binary.abspath and curl_binary.version
out_dir = Path(out_dir or link.link_dir)
assert out_dir.exists()
output: ArchiveOutput = 'favicon.ico'
# later options take precedence
options = [
*CURL_CONFIG.CURL_ARGS,
*CURL_CONFIG.CURL_EXTRA_ARGS,
'--max-time', str(timeout),
'--output', str(output),
*(['--user-agent', '{}'.format(CURL_CONFIG.CURL_USER_AGENT)] if CURL_CONFIG.CURL_USER_AGENT else []),
*([] if CURL_CONFIG.CURL_CHECK_SSL_VALIDITY else ['--insecure']),
]
cmd = [
str(curl_binary.abspath),
*dedupe(options),
FAVICON_CONFIG.FAVICON_PROVIDER.format(domain(link.url)),
]
status = 'failed'
timer = TimedProgress(timeout, prefix=' ')
try:
run(cmd, cwd=str(out_dir), timeout=timeout)
chmod_file(output, cwd=str(out_dir))
status = 'succeeded'
except Exception as err:
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(curl_binary.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,100 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import (
enforce_types,
is_static_file,
domain,
extension,
without_query,
without_fragment,
)
from ..logging_util import TimedProgress
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from archivebox.plugins_extractor.git.config import GIT_CONFIG
from archivebox.plugins_extractor.git.binaries import GIT_BINARY
def get_output_path():
return 'git/'
def get_embed_path(archiveresult=None):
if not archiveresult:
return get_output_path()
try:
return get_output_path() + list((archiveresult.snapshot_dir / get_output_path()).glob('*'))[0].name + '/'
except IndexError:
pass
return get_output_path()
@enforce_types
def should_save_git(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
is_clonable_url = (
(domain(link.url) in GIT_CONFIG.GIT_DOMAINS)
or (extension(link.url) == 'git')
)
if not is_clonable_url:
return False
return GIT_CONFIG.SAVE_GIT
@enforce_types
def save_git(link: Link, out_dir: Optional[Path]=None, timeout: int=GIT_CONFIG.GIT_TIMEOUT) -> ArchiveResult:
"""download full site using git"""
git_binary = GIT_BINARY.load()
assert git_binary.abspath and git_binary.version
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
output_path = out_dir / output
output_path.mkdir(exist_ok=True)
cmd = [
str(git_binary.abspath),
'clone',
*GIT_CONFIG.GIT_ARGS,
*([] if GIT_CONFIG.GIT_CHECK_SSL_VALIDITY else ['-c', 'http.sslVerify=false']),
without_query(without_fragment(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(output_path), timeout=timeout + 1)
if result.returncode == 128:
# ignore failed re-download when the folder already exists
pass
elif result.returncode > 0:
hints = 'Got git response code: {}.'.format(result.returncode)
raise ArchiveError('Failed to save git clone', hints)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(git_binary.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,76 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from archivebox.misc.system import atomic_write
from archivebox.misc.util import (
enforce_types,
get_headers,
dedupe,
)
from archivebox.plugins_extractor.curl.config import CURL_CONFIG
from archivebox.plugins_extractor.curl.binaries import CURL_BINARY
from ..index.schema import Link, ArchiveResult, ArchiveOutput
from ..logging_util import TimedProgress
def get_output_path():
return 'headers.json'
@enforce_types
def should_save_headers(link: Link, out_dir: Optional[str]=None, overwrite: Optional[bool]=False) -> bool:
out_dir_path = Path(out_dir or link.link_dir)
assert out_dir_path
if not overwrite and (out_dir_path / get_output_path()).exists():
return False
return CURL_CONFIG.SAVE_HEADERS
@enforce_types
def save_headers(link: Link, out_dir: Optional[str]=None, timeout: int=CURL_CONFIG.CURL_TIMEOUT) -> ArchiveResult:
"""Download site headers"""
curl_binary = CURL_BINARY.load()
assert curl_binary.abspath and curl_binary.version
out_dir_path = Path(out_dir or link.link_dir)
output_folder = out_dir_path.absolute()
output: ArchiveOutput = get_output_path()
status = 'succeeded'
timer = TimedProgress(timeout + 1, prefix=' ')
# later options take precedence
options = [
*CURL_CONFIG.CURL_ARGS,
*CURL_CONFIG.CURL_EXTRA_ARGS,
'--head',
'--max-time', str(timeout),
*(['--user-agent', '{}'.format(CURL_CONFIG.CURL_USER_AGENT)] if CURL_CONFIG.CURL_USER_AGENT else []),
*([] if CURL_CONFIG.CURL_CHECK_SSL_VALIDITY else ['--insecure']),
]
cmd = [
str(curl_binary.abspath),
*dedupe(options),
link.url,
]
try:
json_headers = get_headers(link.url, timeout=timeout)
output_folder.mkdir(exist_ok=True)
atomic_write(str(output_folder / get_output_path()), json_headers)
except (Exception, OSError) as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir_path),
cmd_version=str(curl_binary.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,158 +0,0 @@
__package__ = 'archivebox.extractors'
from html.parser import HTMLParser
import io
from pathlib import Path
from typing import Optional
from archivebox.config import VERSION
from archivebox.config.common import ARCHIVING_CONFIG
from archivebox.misc.system import atomic_write
from archivebox.misc.util import enforce_types, is_static_file
from archivebox.plugins_extractor.htmltotext.config import HTMLTOTEXT_CONFIG
from ..logging_util import TimedProgress
from ..index.schema import Link, ArchiveResult, ArchiveError
from .title import get_html
def get_output_path():
return "htmltotext.txt"
class HTMLTextExtractor(HTMLParser):
TEXT_ATTRS = [
"alt", "cite", "href", "label",
"list", "placeholder", "title", "value"
]
NOTEXT_TAGS = ["script", "style", "template"]
NOTEXT_HREF = ["data:", "javascript:", "#"]
def __init__(self):
super().__init__()
self.output = io.StringIO()
self._tag_stack = []
def _is_text_attr(self, name, value):
if not isinstance(value, str):
return False
if name == "href" and any(map(lambda p: value.startswith(p), self.NOTEXT_HREF)):
return False
if name in self.TEXT_ATTRS:
return True
return False
def _parent_tag(self):
try:
return self._tag_stack[-1]
except IndexError:
return None
def _in_notext_tag(self):
return any([t in self._tag_stack for t in self.NOTEXT_TAGS])
def handle_starttag(self, tag, attrs):
self._tag_stack.append(tag)
# Don't write out attribute values if any ancestor
# is in NOTEXT_TAGS
if self._in_notext_tag():
return
for name, value in attrs:
if self._is_text_attr(name, value):
self.output.write(f"({value.strip()}) ")
def handle_endtag(self, tag):
orig_stack = self._tag_stack.copy()
try:
# Keep popping tags until we find the nearest
# ancestor matching this end tag
while tag != self._tag_stack.pop():
pass
# Write a space after every tag, to ensure that tokens
# in tag text aren't concatenated. This may result in
# excess spaces, which should be ignored by search tokenizers.
if not self._in_notext_tag() and tag not in self.NOTEXT_TAGS:
self.output.write(" ")
except IndexError:
# Got to the top of the stack, but somehow missed
# this end tag -- maybe malformed markup -- restore the
# stack
self._tag_stack = orig_stack
def handle_data(self, data):
# Don't output text data if any ancestor is in NOTEXT_TAGS
if self._in_notext_tag():
return
data = data.lstrip()
len_before_rstrip = len(data)
data = data.rstrip()
spaces_rstripped = len_before_rstrip - len(data)
if data:
self.output.write(data)
if spaces_rstripped:
# Add back a single space if 1 or more
# whitespace characters were stripped
self.output.write(' ')
def __str__(self):
return self.output.getvalue()
@enforce_types
def should_save_htmltotext(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return HTMLTOTEXT_CONFIG.SAVE_HTMLTOTEXT
@enforce_types
def save_htmltotext(link: Link, out_dir: Optional[Path]=None, timeout: int=ARCHIVING_CONFIG.TIMEOUT) -> ArchiveResult:
"""extract search-indexing-friendly text from an HTML document"""
out_dir = Path(out_dir or link.link_dir)
output = get_output_path()
cmd = ['(internal) archivebox.extractors.htmltotext', './{singlefile,dom}.html']
timer = TimedProgress(timeout, prefix=' ')
extracted_text = None
status = 'failed'
try:
extractor = HTMLTextExtractor()
document = get_html(link, out_dir)
if not document:
raise ArchiveError('htmltotext could not find HTML to parse for article text')
extractor.feed(document)
extractor.close()
extracted_text = str(extractor)
atomic_write(str(out_dir / output), extracted_text)
status = 'succeeded'
except (Exception, OSError) as err:
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=VERSION,
output=output,
status=status,
index_texts=[extracted_text] if extracted_text else [],
**timer.stats,
)

View file

@ -1,118 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import enforce_types, is_static_file, dedupe
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..logging_util import TimedProgress
from plugins_extractor.ytdlp.config import YTDLP_CONFIG
from plugins_extractor.ytdlp.binaries import YTDLP_BINARY
def get_output_path():
return 'media/'
def get_embed_path(archiveresult=None):
if not archiveresult:
return get_output_path()
out_dir = archiveresult.snapshot_dir / get_output_path()
try:
return get_output_path() + list(out_dir.glob('*.mp4'))[0].name
except IndexError:
return get_output_path()
@enforce_types
def should_save_media(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return YTDLP_CONFIG.USE_YTDLP
@enforce_types
def save_media(link: Link, out_dir: Optional[Path]=None, timeout: int=0) -> ArchiveResult:
"""Download playlists or individual video, audio, and subtitles using youtube-dl or yt-dlp"""
YTDLP_BIN = YTDLP_BINARY.load()
assert YTDLP_BIN.abspath and YTDLP_BIN.version
timeout = timeout or YTDLP_CONFIG.YTDLP_TIMEOUT
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
output_path = out_dir / output
output_path.mkdir(exist_ok=True)
# later options take precedence
options = [
*YTDLP_CONFIG.YTDLP_EXTRA_ARGS,
*([] if YTDLP_CONFIG.YTDLP_CHECK_SSL_VALIDITY else ['--no-check-certificate']),
# TODO: add --cookies-from-browser={CHROME_USER_DATA_DIR}
]
cmd = [
str(YTDLP_BIN.abspath),
*dedupe(options),
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(output_path), timeout=timeout + 1, text=True)
chmod_file(output, cwd=str(out_dir))
if result.returncode:
if ('ERROR: Unsupported URL' in result.stderr
or 'HTTP Error 404' in result.stderr
or 'HTTP Error 403' in result.stderr
or 'URL could be a direct video link' in result.stderr
or 'Unable to extract container ID' in result.stderr):
# These happen too frequently on non-media pages to warrant printing to console
pass
else:
hints = (
'Got yt-dlp response code: {}.'.format(result.returncode),
*result.stderr.split('\n'),
)
raise ArchiveError('Failed to save media', hints)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
# add video description and subtitles to full-text index
# Let's try a few different
index_texts = [
# errors:
# * 'strict' to raise a ValueError exception if there is an
# encoding error. The default value of None has the same effect.
# * 'ignore' ignores errors. Note that ignoring encoding errors
# can lead to data loss.
# * 'xmlcharrefreplace' is only supported when writing to a
# file. Characters not supported by the encoding are replaced with
# the appropriate XML character reference &#nnn;.
# There are a few more options described in https://docs.python.org/3/library/functions.html#open
text_file.read_text(encoding='utf-8', errors='xmlcharrefreplace').strip()
for text_file in (
*output_path.glob('*.description'),
*output_path.glob('*.srt'),
*output_path.glob('*.vtt'),
*output_path.glob('*.lrc'),
*output_path.glob('*.lrc'),
)
]
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(YTDLP_BIN.version),
output=output,
status=status,
index_texts=index_texts,
**timer.stats,
)

View file

@ -1,122 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from subprocess import CompletedProcess
from typing import Optional, List
import json
from ..index.schema import Link, ArchiveResult, ArchiveError
from archivebox.misc.system import run, atomic_write
from archivebox.misc.util import (
enforce_types,
is_static_file,
)
from archivebox.plugins_extractor.mercury.config import MERCURY_CONFIG
from archivebox.plugins_extractor.mercury.binaries import MERCURY_BINARY
from ..logging_util import TimedProgress
def get_output_path():
return 'mercury/'
def get_embed_path(archiveresult=None):
return get_output_path() + 'content.html'
@enforce_types
def ShellError(cmd: List[str], result: CompletedProcess, lines: int=20) -> ArchiveError:
# parse out last line of stderr
return ArchiveError(
f'Got {cmd[0]} response code: {result.returncode}).',
" ".join(
line.strip()
for line in (result.stdout + result.stderr).decode().rsplit('\n', lines)[-lines:]
if line.strip()
),
)
@enforce_types
def should_save_mercury(link: Link, out_dir: Optional[str]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = Path(out_dir or link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return MERCURY_CONFIG.SAVE_MERCURY
@enforce_types
def save_mercury(link: Link, out_dir: Optional[Path]=None, timeout: int=MERCURY_CONFIG.MERCURY_TIMEOUT) -> ArchiveResult:
"""download reader friendly version using @postlight/mercury-parser"""
out_dir = Path(out_dir or link.link_dir)
output_folder = out_dir.absolute() / get_output_path()
output = get_output_path()
mercury_binary = MERCURY_BINARY.load()
assert mercury_binary.abspath and mercury_binary.version
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
output_folder.mkdir(exist_ok=True)
# later options take precedence
# By default, get plain text version of article
cmd = [
str(mercury_binary.abspath),
*MERCURY_CONFIG.MERCURY_EXTRA_ARGS,
'--format=text',
link.url,
]
result = run(cmd, cwd=out_dir, timeout=timeout)
try:
article_text = json.loads(result.stdout)
except json.JSONDecodeError:
raise ShellError(cmd, result)
if article_text.get('failed'):
raise ArchiveError('Mercury was not able to get article text from the URL')
atomic_write(str(output_folder / "content.txt"), article_text["content"])
# Get HTML version of article
cmd = [
str(mercury_binary.abspath),
*MERCURY_CONFIG.MERCURY_EXTRA_ARGS,
link.url
]
result = run(cmd, cwd=out_dir, timeout=timeout)
try:
article_json = json.loads(result.stdout)
except json.JSONDecodeError:
raise ShellError(cmd, result)
if article_text.get('failed'):
raise ArchiveError('Mercury was not able to get article HTML from the URL')
atomic_write(str(output_folder / "content.html"), article_json.pop("content"))
atomic_write(str(output_folder / "article.json"), article_json)
# Check for common failure cases
if (result.returncode > 0):
raise ShellError(cmd, result)
except (ArchiveError, Exception, OSError) as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(mercury_binary.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,75 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import (
enforce_types,
is_static_file,
)
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..logging_util import TimedProgress
from plugins_extractor.chrome.config import CHROME_CONFIG
from plugins_extractor.chrome.binaries import CHROME_BINARY
def get_output_path():
return 'output.pdf'
@enforce_types
def should_save_pdf(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return CHROME_CONFIG.SAVE_PDF
@enforce_types
def save_pdf(link: Link, out_dir: Optional[Path]=None, timeout: int=60) -> ArchiveResult:
"""print PDF of site to file using chrome --headless"""
CHROME_BIN = CHROME_BINARY.load()
assert CHROME_BIN.abspath and CHROME_BIN.version
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
cmd = [
str(CHROME_BIN.abspath),
*CHROME_CONFIG.chrome_args(),
'--print-to-pdf',
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout, text=True)
if result.returncode:
hints = (result.stderr or result.stdout)
raise ArchiveError('Failed to save PDF', hints)
chmod_file(get_output_path(), cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
CHROME_BINARY.chrome_cleanup_lockfile()
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(CHROME_BINARY.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,118 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional
import json
from archivebox.misc.system import run, atomic_write
from archivebox.misc.util import enforce_types, is_static_file
from ..index.schema import Link, ArchiveResult, ArchiveError
from ..logging_util import TimedProgress
from .title import get_html
from plugins_extractor.readability.config import READABILITY_CONFIG
from plugins_extractor.readability.binaries import READABILITY_BINARY
def get_output_path():
return 'readability/'
def get_embed_path(archiveresult=None):
return get_output_path() + 'content.html'
@enforce_types
def should_save_readability(link: Link, out_dir: Optional[str]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
output_subdir = (Path(out_dir or link.link_dir) / get_output_path())
if not overwrite and output_subdir.exists():
return False
return READABILITY_CONFIG.SAVE_READABILITY
@enforce_types
def save_readability(link: Link, out_dir: Optional[str]=None, timeout: int=0) -> ArchiveResult:
"""download reader friendly version using @mozilla/readability"""
READABILITY_BIN = READABILITY_BINARY.load()
assert READABILITY_BIN.abspath and READABILITY_BIN.version
timeout = timeout or READABILITY_CONFIG.READABILITY_TIMEOUT
output_subdir = Path(out_dir or link.link_dir).absolute() / get_output_path()
output = get_output_path()
# Readability Docs: https://github.com/mozilla/readability
status = 'succeeded'
# fake command to show the user so they have something to try debugging if get_html fails
cmd = [
str(READABILITY_BIN.abspath),
'{dom,singlefile}.html',
link.url,
]
readability_content = None
timer = TimedProgress(timeout, prefix=' ')
try:
document = get_html(link, Path(out_dir or link.link_dir))
temp_doc = NamedTemporaryFile(delete=False)
temp_doc.write(document.encode("utf-8"))
temp_doc.close()
if not document or len(document) < 10:
raise ArchiveError('Readability could not find HTML to parse for article text')
cmd = [
str(READABILITY_BIN.abspath),
temp_doc.name,
link.url,
]
result = run(cmd, cwd=out_dir, timeout=timeout, text=True)
try:
result_json = json.loads(result.stdout)
assert result_json and 'content' in result_json, 'Readability output is not valid JSON'
except json.JSONDecodeError:
raise ArchiveError('Readability was not able to archive the page (invalid JSON)', result.stdout + result.stderr)
output_subdir.mkdir(exist_ok=True)
readability_content = result_json.pop("textContent")
atomic_write(str(output_subdir / "content.html"), result_json.pop("content"))
atomic_write(str(output_subdir / "content.txt"), readability_content)
atomic_write(str(output_subdir / "article.json"), result_json)
output_tail = [
line.strip()
for line in (result.stdout + result.stderr).rsplit('\n', 5)[-5:]
if line.strip()
]
hints = (
'Got readability response code: {}.'.format(result.returncode),
*output_tail,
)
# Check for common failure cases
if (result.returncode > 0):
raise ArchiveError(f'Readability was not able to archive the page (status={result.returncode})', hints)
except (Exception, OSError) as err:
status = 'failed'
output = err
# prefer Chrome dom output to singlefile because singlefile often contains huge url(data:image/...base64) strings that make the html too long to parse with readability
cmd = [cmd[0], './{dom,singlefile}.html']
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(READABILITY_BIN.version),
output=output,
status=status,
index_texts=[readability_content] if readability_content else [],
**timer.stats,
)

View file

@ -1,70 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import enforce_types, is_static_file
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..logging_util import TimedProgress
from plugins_extractor.chrome.config import CHROME_CONFIG
from plugins_extractor.chrome.binaries import CHROME_BINARY
def get_output_path():
return 'screenshot.png'
@enforce_types
def should_save_screenshot(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return CHROME_CONFIG.SAVE_SCREENSHOT
@enforce_types
def save_screenshot(link: Link, out_dir: Optional[Path]=None, timeout: int=60) -> ArchiveResult:
"""take screenshot of site using chrome --headless"""
CHROME_BIN = CHROME_BINARY.load()
assert CHROME_BIN.abspath and CHROME_BIN.version
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = get_output_path()
cmd = [
str(CHROME_BIN.abspath),
*CHROME_CONFIG.chrome_args(),
'--screenshot',
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout, text=True)
if result.returncode:
hints = (result.stderr or result.stdout)
raise ArchiveError('Failed to save screenshot', hints)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
CHROME_BINARY.chrome_cleanup_lockfile()
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(CHROME_BIN.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,103 +0,0 @@
__package__ = 'archivebox.extractors'
from pathlib import Path
from typing import Optional
import json
from ..index.schema import Link, ArchiveResult, ArchiveError
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import enforce_types, is_static_file, dedupe
from ..logging_util import TimedProgress
from plugins_extractor.chrome.config import CHROME_CONFIG
from plugins_extractor.chrome.binaries import CHROME_BINARY
from plugins_extractor.singlefile.config import SINGLEFILE_CONFIG
from plugins_extractor.singlefile.binaries import SINGLEFILE_BINARY
def get_output_path():
return 'singlefile.html'
@enforce_types
def should_save_singlefile(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
if is_static_file(link.url):
return False
out_dir = out_dir or Path(link.link_dir)
if not overwrite and (out_dir / get_output_path()).exists():
return False
return CHROME_CONFIG.USE_CHROME and SINGLEFILE_CONFIG.SAVE_SINGLEFILE
@enforce_types
def save_singlefile(link: Link, out_dir: Optional[Path]=None, timeout: int=60) -> ArchiveResult:
"""download full site using single-file"""
CHROME_BIN = CHROME_BINARY.load()
assert CHROME_BIN.abspath and CHROME_BIN.version
SINGLEFILE_BIN = SINGLEFILE_BINARY.load()
assert SINGLEFILE_BIN.abspath and SINGLEFILE_BIN.version
out_dir = out_dir or Path(link.link_dir)
output = get_output_path()
browser_args = CHROME_CONFIG.chrome_args(CHROME_TIMEOUT=0)
# SingleFile CLI Docs: https://github.com/gildas-lormeau/SingleFile/tree/master/cli
options = [
'--browser-executable-path={}'.format(CHROME_BIN.abspath),
*(["--browser-cookies-file={}".format(SINGLEFILE_CONFIG.SINGLEFILE_COOKIES_FILE)] if SINGLEFILE_CONFIG.SINGLEFILE_COOKIES_FILE else []),
'--browser-args={}'.format(json.dumps(browser_args)),
*SINGLEFILE_CONFIG.SINGLEFILE_EXTRA_ARGS,
]
cmd = [
str(SINGLEFILE_BIN.abspath),
*dedupe(options),
link.url,
output,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
result = None
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout, text=True, capture_output=True)
# parse out number of files downloaded from last line of stderr:
# "Downloaded: 76 files, 4.0M in 1.6s (2.52 MB/s)"
output_tail = [
line.strip()
for line in (result.stdout + result.stderr).rsplit('\n', 5)[-5:]
if line.strip()
]
hints = (
'Got single-file response code: {}.'.format(result.returncode),
*output_tail,
)
# Check for common failure cases
if (result.returncode > 0) or not (out_dir / output).is_file():
raise ArchiveError(f'SingleFile was not able to archive the page (status={result.returncode})', hints)
chmod_file(output, cwd=str(out_dir))
except (Exception, OSError) as err:
status = 'failed'
# TODO: Make this prettier. This is necessary to run the command (escape JSON internal quotes).
cmd[2] = cmd[2].replace('"', "\\\"")
if result:
err.hints = (result.stdout + result.stderr).split('\n')
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(SINGLEFILE_BIN.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,164 +0,0 @@
__package__ = 'archivebox.extractors'
import re
from html.parser import HTMLParser
from pathlib import Path
from typing import Optional
from archivebox.misc.util import (
enforce_types,
download_url,
htmldecode,
dedupe,
)
from archivebox.plugins_extractor.curl.config import CURL_CONFIG
from archivebox.plugins_extractor.curl.binaries import CURL_BINARY
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..logging_util import TimedProgress
HTML_TITLE_REGEX = re.compile(
r'<title.*?>' # start matching text after <title> tag
r'([^<>]+)', # get everything up to these symbols
re.IGNORECASE | re.MULTILINE | re.DOTALL | re.UNICODE,
)
class TitleParser(HTMLParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title_tag = ""
self.title_og = ""
self.inside_title_tag = False
@property
def title(self):
return self.title_tag or self.title_og or None
def handle_starttag(self, tag, attrs):
if tag.lower() == "title" and not self.title_tag:
self.inside_title_tag = True
elif tag.lower() == "meta" and not self.title_og:
attrs = dict(attrs)
if attrs.get("property") == "og:title" and attrs.get("content"):
self.title_og = attrs.get("content")
def handle_data(self, data):
if self.inside_title_tag and data:
self.title_tag += data.strip()
def handle_endtag(self, tag):
if tag.lower() == "title":
self.inside_title_tag = False
@enforce_types
def get_html(link: Link, path: Path, timeout: int=CURL_CONFIG.CURL_TIMEOUT) -> str:
"""
Try to find wget, singlefile and then dom files.
If none is found, download the url again.
"""
canonical = link.canonical_outputs()
abs_path = path.absolute()
# prefer chrome-generated DOM dump to singlefile as singlefile output often includes HUGE url(data:image/...base64) strings that crash parsers
sources = [canonical["dom_path"], canonical["singlefile_path"], canonical["wget_path"]]
document = None
for source in sources:
try:
with open(abs_path / source, "r", encoding="utf-8") as f:
document = f.read()
break
except (FileNotFoundError, TypeError, UnicodeDecodeError):
continue
if document is None:
return download_url(link.url, timeout=timeout)
else:
return document
def get_output_path():
# TODO: actually save title to this file
# (currently only saved in ArchiveResult.output as charfield value, not saved to filesystem)
return 'title.json'
@enforce_types
def should_save_title(link: Link, out_dir: Optional[str]=None, overwrite: Optional[bool]=False) -> bool:
# if link already has valid title, skip it
if not overwrite and link.title and not link.title.lower().startswith('http'):
return False
return CURL_CONFIG.SAVE_TITLE
def extract_title_with_regex(html):
match = re.search(HTML_TITLE_REGEX, html)
output = htmldecode(match.group(1).strip()) if match else None
return output
@enforce_types
def save_title(link: Link, out_dir: Optional[Path]=None, timeout: int=CURL_CONFIG.CURL_TIMEOUT) -> ArchiveResult:
"""try to guess the page's title from its content"""
from core.models import Snapshot
curl_binary = CURL_BINARY.load()
assert curl_binary.abspath and curl_binary.version
output: ArchiveOutput = None
# later options take precedence
options = [
*CURL_CONFIG.CURL_ARGS,
*CURL_CONFIG.CURL_EXTRA_ARGS,
'--max-time', str(timeout),
*(['--user-agent', '{}'.format(CURL_CONFIG.CURL_USER_AGENT)] if CURL_CONFIG.CURL_USER_AGENT else []),
*([] if CURL_CONFIG.CURL_CHECK_SSL_VALIDITY else ['--insecure']),
]
cmd = [
str(curl_binary.abspath),
*dedupe(options),
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
html = get_html(link, out_dir, timeout=timeout)
try:
# try using relatively strict html parser first
parser = TitleParser()
parser.feed(html)
output = parser.title
if output is None:
raise
except Exception:
# fallback to regex that can handle broken/malformed html
output = extract_title_with_regex(html)
# if title is better than the one in the db, update db with new title
if isinstance(output, str) and output:
if not link.title or len(output) >= len(link.title):
Snapshot.objects.filter(url=link.url,
timestamp=link.timestamp)\
.update(title=output)
else:
# if no content was returned, dont save a title (because it might be a temporary error)
if not html:
raise ArchiveError('Unable to detect page title')
# output = html[:128] # use first bit of content as the title
output = link.base_url # use the filename as the title (better UX)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(curl_binary.version),
output=output,
status=status,
**timer.stats,
)

View file

@ -1,289 +0,0 @@
__package__ = 'archivebox.extractors'
import re
import os
from pathlib import Path
from typing import Optional
from datetime import datetime, timezone
from archivebox.misc.system import run, chmod_file
from archivebox.misc.util import (
enforce_types,
without_fragment,
without_query,
path,
domain,
urldecode,
dedupe,
)
from archivebox.plugins_extractor.wget.config import WGET_CONFIG
from archivebox.plugins_extractor.wget.binaries import WGET_BINARY
from ..logging_util import TimedProgress
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
def get_output_path():
# TODO: actually save output into this folder, instead of do {domain}/**/index.html
return 'wget/'
def get_embed_path(archiveresult=None):
if not archiveresult:
return get_output_path()
link = archiveresult.snapshot.as_link()
return wget_output_path(link)
@enforce_types
def should_save_wget(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
output_path = wget_output_path(link)
out_dir = out_dir or Path(link.link_dir)
if not overwrite and output_path and (out_dir / output_path).exists():
return False
return WGET_CONFIG.SAVE_WGET
@enforce_types
def save_wget(link: Link, out_dir: Optional[Path]=None, timeout: int=WGET_CONFIG.WGET_TIMEOUT) -> ArchiveResult:
"""download full site using wget"""
out_dir = Path(out_dir or link.link_dir)
assert out_dir.exists()
if WGET_CONFIG.SAVE_WARC:
warc_dir = out_dir / "warc"
warc_dir.mkdir(exist_ok=True)
warc_path = warc_dir / str(int(datetime.now(timezone.utc).timestamp()))
wget_binary = WGET_BINARY.load()
assert wget_binary.abspath and wget_binary.version
# WGET CLI Docs: https://www.gnu.org/software/wget/manual/wget.html
output: ArchiveOutput = None
# later options take precedence
options = [
*WGET_CONFIG.WGET_ARGS,
*WGET_CONFIG.WGET_EXTRA_ARGS,
'--timeout={}'.format(timeout),
*(['--restrict-file-names={}'.format(WGET_CONFIG.WGET_RESTRICT_FILE_NAMES)] if WGET_CONFIG.WGET_RESTRICT_FILE_NAMES else []),
*(['--warc-file={}'.format(str(warc_path))] if WGET_CONFIG.SAVE_WARC else []),
*(['--page-requisites'] if WGET_CONFIG.SAVE_WGET_REQUISITES else []),
*(['--user-agent={}'.format(WGET_CONFIG.WGET_USER_AGENT)] if WGET_CONFIG.WGET_USER_AGENT else []),
*(['--load-cookies', str(WGET_CONFIG.WGET_COOKIES_FILE)] if WGET_CONFIG.WGET_COOKIES_FILE else []),
*(['--compression=auto'] if WGET_CONFIG.WGET_AUTO_COMPRESSION else []),
*([] if WGET_CONFIG.SAVE_WARC else ['--timestamping']),
*([] if WGET_CONFIG.WGET_CHECK_SSL_VALIDITY else ['--no-check-certificate', '--no-hsts']),
# '--server-response', # print headers for better error parsing
]
cmd = [
str(wget_binary.abspath),
*dedupe(options),
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=str(out_dir), timeout=timeout)
output = wget_output_path(link)
# parse out number of files downloaded from last line of stderr:
# "Downloaded: 76 files, 4.0M in 1.6s (2.52 MB/s)"
output_tail = [
line.strip()
for line in (result.stdout + result.stderr).decode().rsplit('\n', 3)[-3:]
if line.strip()
]
files_downloaded = (
int(output_tail[-1].strip().split(' ', 2)[1] or 0)
if 'Downloaded:' in output_tail[-1]
else 0
)
hints = (
'Got wget response code: {}.'.format(result.returncode),
*output_tail,
)
# Check for common failure cases
if (result.returncode > 0 and files_downloaded < 1) or output is None:
if b'403: Forbidden' in result.stderr:
raise ArchiveError('403 Forbidden (try changing WGET_USER_AGENT)', hints)
if b'404: Not Found' in result.stderr:
raise ArchiveError('404 Not Found', hints)
if b'ERROR 500: Internal Server Error' in result.stderr:
raise ArchiveError('500 Internal Server Error', hints)
raise ArchiveError('Wget failed or got an error from the server', hints)
if (out_dir / output).exists():
chmod_file(output, cwd=str(out_dir))
else:
print(f' {out_dir}/{output}')
raise ArchiveError('Failed to find wget output after running', hints)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=str(out_dir),
cmd_version=str(wget_binary.version),
output=output,
status=status,
**timer.stats,
)
@enforce_types
def unsafe_wget_output_path(link: Link) -> Optional[str]:
# There used to be a bunch of complex reverse-engineering path mapping logic here,
# but it was removed in favor of just walking through the output folder recursively to try to find the
# html file that wget produced. It's *much much much* slower than deriving it statically, and is currently
# one of the main bottlenecks of ArchiveBox's performance (the output data is often on a slow HDD or network mount).
# But it's STILL better than trying to figure out URL -> html filepath mappings ourselves from first principles.
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+") / urldecode(full_path)
for _ in range(4):
try:
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
html_files = [
f for f in search_dir.iterdir()
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", str(f), re.I | re.M)
]
if html_files:
return str(html_files[0].relative_to(link.link_dir))
# sometimes wget'd URLs have no ext and return non-html
# e.g. /some/example/rss/all -> some RSS XML content)
# /some/other/url.o4g -> some binary unrecognized ext)
# test this with archivebox add --depth=1 https://getpocket.com/users/nikisweeting/feed/all
last_part_of_url = urldecode(full_path.rsplit('/', 1)[-1])
for file_present in search_dir.iterdir():
if file_present == last_part_of_url:
return str((search_dir / file_present).relative_to(link.link_dir))
except OSError:
# OSError 36 and others can happen here, caused by trying to check for impossible paths
# (paths derived from URLs can often contain illegal unicode characters or be too long,
# causing the OS / filesystem to reject trying to open them with a system-level error)
pass
# Move up one directory level
search_dir = search_dir.parent
if str(search_dir) == link.link_dir:
break
# check for literally any file present that isnt an empty folder
domain_dir = Path(domain(link.url).replace(":", "+"))
files_within = [path for path in (Path(link.link_dir) / domain_dir).glob('**/*.*') if not str(path).endswith('.orig')]
if files_within:
return str((domain_dir / files_within[-1]).relative_to(link.link_dir))
# abandon all hope, wget either never downloaded, or it produced an output path so horribly mutilated
# that it's better we just pretend it doesnt exist
# this is why ArchiveBox's specializes in REDUNDANTLY saving copies of sites with multiple different tools
return None
@enforce_types
def wget_output_path(link: Link, nocache: bool=False) -> Optional[str]:
"""calculate the path to the wgetted .html file, since wget may
adjust some paths to be different than the base_url path.
See docs on: wget --adjust-extension (-E), --restrict-file-names=windows|unix|ascii, --convert-links
WARNING: this function is extremely error prone because mapping URLs to filesystem paths deterministically
is basically impossible. Every OS and filesystem have different requirements on what special characters are
allowed, and URLs are *full* of all kinds of special characters, illegal unicode, and generally unsafe strings
that you dont want anywhere near your filesystem. Also URLs can be obscenely long, but most filesystems dont
accept paths longer than 250 characters. On top of all that, this function only exists to try to reverse engineer
wget's approach to solving this problem, so this is a shittier, less tested version of their already insanely
complicated attempt to do this. Here be dragons:
- https://github.com/ArchiveBox/ArchiveBox/issues/549
- https://github.com/ArchiveBox/ArchiveBox/issues/1373
- https://stackoverflow.com/questions/9532499/check-whether-a-path-is-valid-in-python-without-creating-a-file-at-the-paths-ta
- and probably many more that I didn't realize were caused by this...
The only constructive thing we could possibly do to this function is to figure out how to remove it.
Preach loudly to anyone who will listen: never attempt to map URLs to filesystem paths,
and pray you never have to deal with the aftermath of someone else's attempt to do so...
"""
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
# > example.com/index.html
# https://example.com?v=zzVa_tX1OiI
# > example.com/index.html@v=zzVa_tX1OiI.html
# https://www.example.com/?v=zzVa_tX1OiI
# > example.com/index.html@v=zzVa_tX1OiI.html
# https://example.com/abc
# > example.com/abc.html
# https://example.com/abc/
# > example.com/abc/index.html
# https://example.com/abc?v=zzVa_tX1OiI.html
# > example.com/abc@v=zzVa_tX1OiI.html
# https://example.com/abc/?v=zzVa_tX1OiI.html
# > example.com/abc/index.html@v=zzVa_tX1OiI.html
# https://example.com/abc/test.html
# > example.com/abc/test.html
# https://example.com/abc/test?v=zzVa_tX1OiI
# > example.com/abc/test@v=zzVa_tX1OiI.html
# https://example.com/abc/test/?v=zzVa_tX1OiI
# > example.com/abc/test/index.html@v=zzVa_tX1OiI.html
cache_key = f'{link.url_hash}:{link.timestamp}-{link.downloaded_at and link.downloaded_at.timestamp()}-wget-output-path'
if not nocache:
from django.core.cache import cache
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# There's also lots of complexity around how the urlencoding and renaming
# is done for pages with query and hash fragments, extensions like shtml / htm / php / etc,
# unicode escape sequences, punycode domain names, unicode double-width characters, extensions longer than
# 4 characters, paths with multipe extensions, etc. the list goes on...
output_path = None
try:
output_path = unsafe_wget_output_path(link)
except Exception as err:
pass # better to pretend it just failed to download than expose gnarly OSErrors to users
# check for unprintable unicode characters
# https://github.com/ArchiveBox/ArchiveBox/issues/1373
if output_path:
safe_path = output_path.encode('utf-8', 'replace').decode()
if output_path != safe_path:
# contains unprintable unicode characters that will break other parts of archivebox
# better to pretend it doesnt exist and fallback to parent dir than crash archivebox
output_path = None
# check for a path that is just too long to safely handle across different OS's
# https://github.com/ArchiveBox/ArchiveBox/issues/549
if output_path and len(output_path) > 250:
output_path = None
if output_path:
if not nocache:
cache.set(cache_key, output_path)
return output_path
# fallback to just the domain dir
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+")
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).replace(":", "+")
# fallback to just the domain dir without port
search_dir = Path(link.link_dir) / domain(link.url).split(":", 1)[0]
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).split(":", 1)[0]
return None