working archivebox command inside django legacy folder

This commit is contained in:
Nick Sweeting 2019-04-02 18:53:21 -04:00
parent 27708152d2
commit 68b4c01c6b
49 changed files with 222 additions and 673 deletions

View file

@ -1,210 +0,0 @@
#!/usr/bin/env python3
"""
ArchiveBox command line application.
./archive and ./bin/archivebox both point to this file,
but you can also run it directly using `python3 archive.py`
Usage & Documentation:
https://github.com/pirate/ArchiveBox/Wiki
"""
__package__ = 'archivebox'
import os
import sys
import shutil
from typing import List, Optional
from core.schema import Link
from core.links import links_after_timestamp
from core.index import write_links_index, load_links_index
from core.archive_methods import archive_link
from core.config import (
ONLY_NEW,
OUTPUT_DIR,
VERSION,
ANSI,
CURL_VERSION,
GIT_VERSION,
WGET_VERSION,
YOUTUBEDL_VERSION,
CHROME_VERSION,
USE_CURL,
USE_WGET,
USE_CHROME,
CURL_BINARY,
GIT_BINARY,
WGET_BINARY,
YOUTUBEDL_BINARY,
CHROME_BINARY,
FETCH_GIT,
FETCH_MEDIA,
)
from core.util import (
enforce_types,
handle_stdin_import,
handle_file_import,
)
from core.logs import (
log_archiving_started,
log_archiving_paused,
log_archiving_finished,
)
__AUTHOR__ = 'Nick Sweeting <git@nicksweeting.com>'
__VERSION__ = VERSION
__DESCRIPTION__ = 'ArchiveBox: The self-hosted internet archive.'
__DOCUMENTATION__ = 'https://github.com/pirate/ArchiveBox/wiki'
def print_help():
print('ArchiveBox: The self-hosted internet archive.\n')
print("Documentation:")
print(" https://github.com/pirate/ArchiveBox/wiki\n")
print("UI Usage:")
print(" Open output/index.html to view your archive.\n")
print("CLI Usage:")
print(" mkdir data; cd data/")
print(" archivebox init\n")
print(" echo 'https://example.com/some/page' | archivebox add")
print(" archivebox add https://example.com/some/other/page")
print(" archivebox add --depth=1 ~/Downloads/bookmarks_export.html")
print(" archivebox add --depth=1 https://example.com/feed.rss")
print(" archivebox update --resume=15109948213.123")
def print_version():
print('ArchiveBox v{}'.format(__VERSION__))
print()
print(
'[{}] CURL:'.format('' if USE_CURL else 'X').ljust(14),
'{} --version\n'.format(shutil.which(CURL_BINARY)),
' '*13, CURL_VERSION, '\n',
)
print(
'[{}] GIT:'.format('' if FETCH_GIT else 'X').ljust(14),
'{} --version\n'.format(shutil.which(GIT_BINARY)),
' '*13, GIT_VERSION, '\n',
)
print(
'[{}] WGET:'.format('' if USE_WGET else 'X').ljust(14),
'{} --version\n'.format(shutil.which(WGET_BINARY)),
' '*13, WGET_VERSION, '\n',
)
print(
'[{}] YOUTUBEDL:'.format('' if FETCH_MEDIA else 'X').ljust(14),
'{} --version\n'.format(shutil.which(YOUTUBEDL_BINARY)),
' '*13, YOUTUBEDL_VERSION, '\n',
)
print(
'[{}] CHROME:'.format('' if USE_CHROME else 'X').ljust(14),
'{} --version\n'.format(shutil.which(CHROME_BINARY)),
' '*13, CHROME_VERSION, '\n',
)
def main(args=None) -> None:
if args is None:
args = sys.argv
if set(args).intersection(('-h', '--help', 'help')) or len(args) > 2:
print_help()
raise SystemExit(0)
if set(args).intersection(('--version', 'version')):
print_version()
raise SystemExit(0)
### Handle CLI arguments
# ./archive bookmarks.html
# ./archive 1523422111.234
import_path, resume = None, None
if len(args) == 2:
# if the argument is a string, it's a import_path file to import
# if it's a number, it's a timestamp to resume archiving from
if args[1].replace('.', '').isdigit():
import_path, resume = None, args[1]
else:
import_path, resume = args[1], None
### Set up output folder
if not os.path.exists(OUTPUT_DIR):
print('{green}[+] Created a new archive directory: {}{reset}'.format(OUTPUT_DIR, **ANSI))
os.makedirs(OUTPUT_DIR)
else:
not_empty = len(set(os.listdir(OUTPUT_DIR)) - {'.DS_Store'})
index_exists = os.path.exists(os.path.join(OUTPUT_DIR, 'index.json'))
if not_empty and not index_exists:
print(
("{red}[X] Could not find index.json in the OUTPUT_DIR: {reset}{}\n\n"
" If you're trying to update an existing archive, you must set OUTPUT_DIR to or run archivebox from inside the archive folder you're trying to update.\n"
" If you're trying to create a new archive, you must run archivebox inside a completely empty directory."
"\n\n"
" {lightred}Hint:{reset} To import a data folder created by an older version of ArchiveBox, \n"
" just cd into the folder and run the archivebox command to pick up where you left off.\n\n"
" (Always make sure your data folder is backed up first before updating ArchiveBox)"
).format(OUTPUT_DIR, **ANSI)
)
raise SystemExit(1)
### Handle ingesting urls piped in through stdin
# (.e.g if user does cat example_urls.txt | ./archive)
if not sys.stdin.isatty():
stdin_raw_text = sys.stdin.read()
if stdin_raw_text and import_path:
print(
'[X] You should pass either a path as an argument, '
'or pass a list of links via stdin, but not both.\n'
)
print_help()
raise SystemExit(1)
import_path = handle_stdin_import(stdin_raw_text)
### Handle ingesting url from a remote file/feed
# (e.g. if an RSS feed URL is used as the import path)
if import_path:
import_path = handle_file_import(import_path)
### Run the main archive update process
update_archive_data(import_path=import_path, resume=resume)
@enforce_types
def update_archive_data(import_path: Optional[str]=None, resume: Optional[float]=None) -> List[Link]:
"""The main ArchiveBox entrancepoint. Everything starts here."""
# Step 1: Load list of links from the existing index
# merge in and dedupe new links from import_path
all_links, new_links = load_links_index(out_dir=OUTPUT_DIR, import_path=import_path)
# Step 2: Write updated index with deduped old and new links back to disk
write_links_index(links=list(all_links), out_dir=OUTPUT_DIR)
# Step 3: Run the archive methods for each link
links = new_links if ONLY_NEW else all_links
log_archiving_started(len(links), resume)
idx: int = 0
link: Optional[Link] = None
try:
for idx, link in enumerate(links_after_timestamp(links, resume)):
archive_link(link, link_dir=link.link_dir)
except KeyboardInterrupt:
log_archiving_paused(len(links), idx, link.timestamp if link else '0')
raise SystemExit(0)
except:
print()
raise
log_archiving_finished(len(links))
# Step 4: Re-write links index with updated titles, icons, and resources
all_links, _ = load_links_index(out_dir=OUTPUT_DIR)
write_links_index(links=list(all_links), out_dir=OUTPUT_DIR, finished=True)
return all_links
if __name__ == '__main__':
main(sys.argv)

View file

@ -1,694 +0,0 @@
import os
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
from datetime import datetime
from core.schema import Link, ArchiveResult, ArchiveOutput
from core.index import (
write_link_index,
patch_links_index,
load_json_link_index,
)
from core.config import (
CURL_BINARY,
GIT_BINARY,
WGET_BINARY,
YOUTUBEDL_BINARY,
FETCH_FAVICON,
FETCH_TITLE,
FETCH_WGET,
FETCH_WGET_REQUISITES,
FETCH_PDF,
FETCH_SCREENSHOT,
FETCH_DOM,
FETCH_WARC,
FETCH_GIT,
FETCH_MEDIA,
SUBMIT_ARCHIVE_DOT_ORG,
TIMEOUT,
MEDIA_TIMEOUT,
GIT_DOMAINS,
VERSION,
WGET_USER_AGENT,
CHECK_SSL_VALIDITY,
COOKIES_FILE,
CURL_VERSION,
WGET_VERSION,
CHROME_VERSION,
GIT_VERSION,
YOUTUBEDL_VERSION,
WGET_AUTO_COMPRESSION,
)
from core.util import (
enforce_types,
domain,
extension,
without_query,
without_fragment,
fetch_page_title,
is_static_file,
TimedProgress,
chmod_file,
wget_output_path,
chrome_args,
run, PIPE, DEVNULL,
)
from core.logs import (
log_link_archiving_started,
log_link_archiving_finished,
log_archive_method_started,
log_archive_method_finished,
)
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
@enforce_types
def archive_link(link: Link, link_dir: Optional[str]=None) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = (
('title', should_fetch_title, fetch_title),
('favicon', should_fetch_favicon, fetch_favicon),
('wget', should_fetch_wget, fetch_wget),
('pdf', should_fetch_pdf, fetch_pdf),
('screenshot', should_fetch_screenshot, fetch_screenshot),
('dom', should_fetch_dom, fetch_dom),
('git', should_fetch_git, fetch_git),
('media', should_fetch_media, fetch_media),
('archive_org', should_fetch_archive_dot_org, archive_dot_org),
)
link_dir = link_dir or link.link_dir
try:
is_new = not os.path.exists(link_dir)
if is_new:
os.makedirs(link_dir)
link = load_json_link_index(link, link_dir=link_dir)
log_link_archiving_started(link, link_dir, is_new)
link = link.overwrite(updated=datetime.now())
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
for method_name, should_run, method_function in ARCHIVE_METHODS:
try:
if method_name not in link.history:
link.history[method_name] = []
if should_run(link, link_dir):
log_archive_method_started(method_name)
result = method_function(link=link, link_dir=link_dir)
link.history[method_name].append(result)
stats[result.status] += 1
log_archive_method_finished(result)
else:
stats['skipped'] += 1
except Exception as e:
raise Exception('Exception in archive_methods.fetch_{}(Link(url={}))'.format(
method_name,
link.url,
)) from e
# print(' ', stats)
write_link_index(link, link_dir=link.link_dir)
patch_links_index(link)
# # If any changes were made, update the main links index json and html
# was_changed = stats['succeeded'] or stats['failed']
# if was_changed:
# patch_links_index(link)
log_link_archiving_finished(link, link.link_dir, is_new, stats)
except KeyboardInterrupt:
try:
write_link_index(link, link_dir=link.link_dir)
except:
pass
raise
except Exception as err:
print(' ! Failed to archive link: {}: {}'.format(err.__class__.__name__, err))
raise
return link
### Archive Method Functions
@enforce_types
def should_fetch_title(link: Link, link_dir: Optional[str]=None) -> bool:
# if link already has valid title, skip it
if link.title and not link.title.lower().startswith('http'):
return False
if is_static_file(link.url):
return False
return FETCH_TITLE
@enforce_types
def fetch_title(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""try to guess the page's title from its content"""
output: ArchiveOutput = None
cmd = [
CURL_BINARY,
link.url,
'|',
'grep',
'<title',
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
output = fetch_page_title(link.url, timeout=timeout, progress=False)
if not output:
raise ArchiveError('Unable to detect page title')
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CURL_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_favicon(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if os.path.exists(os.path.join(link_dir, 'favicon.ico')):
return False
return FETCH_FAVICON
@enforce_types
def fetch_favicon(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download site favicon from google's favicon api"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'favicon.ico'
cmd = [
CURL_BINARY,
'--max-time', str(timeout),
'--location',
'--output', str(output),
*([] if CHECK_SSL_VALIDITY else ['--insecure']),
'https://www.google.com/s2/favicons?domain={}'.format(domain(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
run(cmd, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
chmod_file(output, cwd=link_dir)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CURL_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_wget(link: Link, link_dir: Optional[str]=None) -> bool:
output_path = wget_output_path(link)
link_dir = link_dir or link.link_dir
if output_path and os.path.exists(os.path.join(link_dir, output_path)):
return False
return FETCH_WGET
@enforce_types
def fetch_wget(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using wget"""
link_dir = link_dir or link.link_dir
if FETCH_WARC:
warc_dir = os.path.join(link_dir, 'warc')
os.makedirs(warc_dir, exist_ok=True)
warc_path = os.path.join('warc', str(int(datetime.now().timestamp())))
# WGET CLI Docs: https://www.gnu.org/software/wget/manual/wget.html
output: ArchiveOutput = None
cmd = [
WGET_BINARY,
# '--server-response', # print headers for better error parsing
'--no-verbose',
'--adjust-extension',
'--convert-links',
'--force-directories',
'--backup-converted',
'--span-hosts',
'--no-parent',
'-e', 'robots=off',
'--restrict-file-names=unix',
'--timeout={}'.format(timeout),
*([] if FETCH_WARC else ['--timestamping']),
*(['--warc-file={}'.format(warc_path)] if FETCH_WARC else []),
*(['--page-requisites'] if FETCH_WGET_REQUISITES else []),
*(['--user-agent={}'.format(WGET_USER_AGENT)] if WGET_USER_AGENT else []),
*(['--load-cookies', COOKIES_FILE] if COOKIES_FILE else []),
*(['--compression=auto'] if WGET_AUTO_COMPRESSION else []),
*([] if CHECK_SSL_VALIDITY else ['--no-check-certificate', '--no-hsts']),
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
output = wget_output_path(link)
# parse out number of files downloaded from last line of stderr:
# "Downloaded: 76 files, 4.0M in 1.6s (2.52 MB/s)"
output_tail = [
line.strip()
for line in (result.stdout + result.stderr).decode().rsplit('\n', 3)[-3:]
if line.strip()
]
files_downloaded = (
int(output_tail[-1].strip().split(' ', 2)[1] or 0)
if 'Downloaded:' in output_tail[-1]
else 0
)
# Check for common failure cases
if result.returncode > 0 and files_downloaded < 1:
hints = (
'Got wget response code: {}.'.format(result.returncode),
*output_tail,
)
if b'403: Forbidden' in result.stderr:
raise ArchiveError('403 Forbidden (try changing WGET_USER_AGENT)', hints)
if b'404: Not Found' in result.stderr:
raise ArchiveError('404 Not Found', hints)
if b'ERROR 500: Internal Server Error' in result.stderr:
raise ArchiveError('500 Internal Server Error', hints)
raise ArchiveError('Got an error from the server', hints)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=WGET_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_pdf(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.pdf')):
return False
return FETCH_PDF
@enforce_types
def fetch_pdf(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""print PDF of site to file using chrome --headless"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'output.pdf'
cmd = [
*chrome_args(TIMEOUT=timeout),
'--print-to-pdf',
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
if result.returncode:
hints = (result.stderr or result.stdout).decode()
raise ArchiveError('Failed to print PDF', hints)
chmod_file('output.pdf', cwd=link_dir)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CHROME_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_screenshot(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'screenshot.png')):
return False
return FETCH_SCREENSHOT
@enforce_types
def fetch_screenshot(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""take screenshot of site using chrome --headless"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'screenshot.png'
cmd = [
*chrome_args(TIMEOUT=timeout),
'--screenshot',
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
if result.returncode:
hints = (result.stderr or result.stdout).decode()
raise ArchiveError('Failed to take screenshot', hints)
chmod_file(output, cwd=link_dir)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CHROME_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_dom(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.html')):
return False
return FETCH_DOM
@enforce_types
def fetch_dom(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""print HTML of site to file using chrome --dump-html"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'output.html'
output_path = os.path.join(link_dir, str(output))
cmd = [
*chrome_args(TIMEOUT=timeout),
'--dump-dom',
link.url
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
with open(output_path, 'w+') as f:
result = run(cmd, stdout=f, stderr=PIPE, cwd=link_dir, timeout=timeout)
if result.returncode:
hints = result.stderr.decode()
raise ArchiveError('Failed to fetch DOM', hints)
chmod_file(output, cwd=link_dir)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CHROME_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_git(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'git')):
return False
is_clonable_url = (
(domain(link.url) in GIT_DOMAINS)
or (extension(link.url) == 'git')
)
if not is_clonable_url:
return False
return FETCH_GIT
@enforce_types
def fetch_git(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using git"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'git'
output_path = os.path.join(link_dir, str(output))
os.makedirs(output_path, exist_ok=True)
cmd = [
GIT_BINARY,
'clone',
'--mirror',
'--recursive',
*([] if CHECK_SSL_VALIDITY else ['-c', 'http.sslVerify=false']),
without_query(without_fragment(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=PIPE, cwd=output_path, timeout=timeout + 1)
if result.returncode == 128:
# ignore failed re-download when the folder already exists
pass
elif result.returncode > 0:
hints = 'Got git response code: {}.'.format(result.returncode)
raise ArchiveError('Failed git download', hints)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=GIT_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_media(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'media')):
return False
return FETCH_MEDIA
@enforce_types
def fetch_media(link: Link, link_dir: Optional[str]=None, timeout: int=MEDIA_TIMEOUT) -> ArchiveResult:
"""Download playlists or individual video, audio, and subtitles using youtube-dl"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'media'
output_path = os.path.join(link_dir, str(output))
os.makedirs(output_path, exist_ok=True)
cmd = [
YOUTUBEDL_BINARY,
'--write-description',
'--write-info-json',
'--write-annotations',
'--yes-playlist',
'--write-thumbnail',
'--no-call-home',
'--no-check-certificate',
'--user-agent',
'--all-subs',
'--extract-audio',
'--keep-video',
'--ignore-errors',
'--geo-bypass',
'--audio-format', 'mp3',
'--audio-quality', '320K',
'--embed-thumbnail',
'--add-metadata',
*([] if CHECK_SSL_VALIDITY else ['--no-check-certificate']),
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=PIPE, cwd=output_path, timeout=timeout + 1)
chmod_file(output, cwd=link_dir)
if result.returncode:
if (b'ERROR: Unsupported URL' in result.stderr
or b'HTTP Error 404' in result.stderr
or b'HTTP Error 403' in result.stderr
or b'URL could be a direct video link' in result.stderr
or b'Unable to extract container ID' in result.stderr):
# These happen too frequently on non-media pages to warrant printing to console
pass
else:
hints = (
'Got youtube-dl response code: {}.'.format(result.returncode),
*result.stderr.decode().split('\n'),
)
raise ArchiveError('Failed to download media', hints)
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=YOUTUBEDL_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def should_fetch_archive_dot_org(link: Link, link_dir: Optional[str]=None) -> bool:
link_dir = link_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'archive.org.txt')):
# if open(path, 'r').read().strip() != 'None':
return False
return SUBMIT_ARCHIVE_DOT_ORG
@enforce_types
def archive_dot_org(link: Link, link_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""submit site to archive.org for archiving via their service, save returned archive url"""
link_dir = link_dir or link.link_dir
output: ArchiveOutput = 'archive.org.txt'
archive_org_url = None
submit_url = 'https://web.archive.org/save/{}'.format(link.url)
cmd = [
CURL_BINARY,
'--location',
'--head',
'--user-agent', 'ArchiveBox/{} (+https://github.com/pirate/ArchiveBox/)'.format(VERSION), # be nice to the Archive.org people and show them where all this ArchiveBox traffic is coming from
'--max-time', str(timeout),
*([] if CHECK_SSL_VALIDITY else ['--insecure']),
submit_url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, stdout=PIPE, stderr=DEVNULL, cwd=link_dir, timeout=timeout)
content_location, errors = parse_archive_dot_org_response(result.stdout)
if content_location:
archive_org_url = 'https://web.archive.org{}'.format(content_location[0])
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
archive_org_url = None
# raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
elif errors:
raise ArchiveError(', '.join(errors))
else:
raise ArchiveError('Failed to find "content-location" URL header in Archive.org response.')
except Exception as err:
status = 'failed'
output = err
finally:
timer.end()
if output and not isinstance(output, Exception):
# instead of writing None when archive.org rejects the url write the
# url to resubmit it to archive.org. This is so when the user visits
# the URL in person, it will attempt to re-archive it, and it'll show the
# nicer error message explaining why the url was rejected if it fails.
archive_org_url = archive_org_url or submit_url
with open(os.path.join(link_dir, str(output)), 'w', encoding='utf-8') as f:
f.write(archive_org_url)
chmod_file('archive.org.txt', cwd=link_dir)
output = archive_org_url
return ArchiveResult(
cmd=cmd,
pwd=link_dir,
cmd_version=CURL_VERSION,
output=output,
status=status,
**timer.stats,
)
@enforce_types
def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]:
# Parse archive.org response headers
headers: Dict[str, List[str]] = defaultdict(list)
# lowercase all the header names and store in dict
for header in response.splitlines():
if b':' not in header or not header.strip():
continue
name, val = header.decode().split(':', 1)
headers[name.lower().strip()].append(val.strip())
# Get successful archive url in "content-location" header or any errors
content_location = headers['content-location']
errors = headers['x-archive-wayback-runtime-error']
return content_location, errors

View file

@ -1,272 +0,0 @@
import os
import re
import sys
import shutil
from typing import Optional
from subprocess import run, PIPE, DEVNULL
# ******************************************************************************
# Documentation: https://github.com/pirate/ArchiveBox/wiki/Configuration
# Use the 'env' command to pass config options to ArchiveBox. e.g.:
# env USE_COLOR=True CHROME_BINARY=google-chrome ./archive export.html
# ******************************************************************************
IS_TTY = sys.stdout.isatty()
USE_COLOR = os.getenv('USE_COLOR', str(IS_TTY) ).lower() == 'true'
SHOW_PROGRESS = os.getenv('SHOW_PROGRESS', str(IS_TTY) ).lower() == 'true'
OUTPUT_DIR = os.getenv('OUTPUT_DIR', '')
ONLY_NEW = os.getenv('ONLY_NEW', 'False' ).lower() == 'true'
TIMEOUT = int(os.getenv('TIMEOUT', '60'))
MEDIA_TIMEOUT = int(os.getenv('MEDIA_TIMEOUT', '3600'))
OUTPUT_PERMISSIONS = os.getenv('OUTPUT_PERMISSIONS', '755' )
FOOTER_INFO = os.getenv('FOOTER_INFO', 'Content is hosted for personal archiving purposes only. Contact server owner for any takedown requests.',)
URL_BLACKLIST = os.getenv('URL_BLACKLIST', None)
FETCH_WGET = os.getenv('FETCH_WGET', 'True' ).lower() == 'true'
FETCH_WGET_REQUISITES = os.getenv('FETCH_WGET_REQUISITES', 'True' ).lower() == 'true'
FETCH_PDF = os.getenv('FETCH_PDF', 'True' ).lower() == 'true'
FETCH_SCREENSHOT = os.getenv('FETCH_SCREENSHOT', 'True' ).lower() == 'true'
FETCH_DOM = os.getenv('FETCH_DOM', 'True' ).lower() == 'true'
FETCH_WARC = os.getenv('FETCH_WARC', 'True' ).lower() == 'true'
FETCH_GIT = os.getenv('FETCH_GIT', 'True' ).lower() == 'true'
FETCH_MEDIA = os.getenv('FETCH_MEDIA', 'True' ).lower() == 'true'
FETCH_FAVICON = os.getenv('FETCH_FAVICON', 'True' ).lower() == 'true'
FETCH_TITLE = os.getenv('FETCH_TITLE', 'True' ).lower() == 'true'
SUBMIT_ARCHIVE_DOT_ORG = os.getenv('SUBMIT_ARCHIVE_DOT_ORG', 'True' ).lower() == 'true'
CHECK_SSL_VALIDITY = os.getenv('CHECK_SSL_VALIDITY', 'True' ).lower() == 'true'
RESOLUTION = os.getenv('RESOLUTION', '1440,2000' )
GIT_DOMAINS = os.getenv('GIT_DOMAINS', 'github.com,bitbucket.org,gitlab.com').split(',')
WGET_USER_AGENT = os.getenv('WGET_USER_AGENT', 'ArchiveBox/{VERSION} (+https://github.com/pirate/ArchiveBox/) wget/{WGET_VERSION}')
COOKIES_FILE = os.getenv('COOKIES_FILE', None)
CHROME_USER_DATA_DIR = os.getenv('CHROME_USER_DATA_DIR', None)
CHROME_HEADLESS = os.getenv('CHROME_HEADLESS', 'True' ).lower() == 'true'
CHROME_USER_AGENT = os.getenv('CHROME_USER_AGENT', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36')
CHROME_SANDBOX = os.getenv('CHROME_SANDBOX', 'True' ).lower() == 'true'
USE_CURL = os.getenv('USE_CURL', 'True' ).lower() == 'true'
USE_WGET = os.getenv('USE_WGET', 'True' ).lower() == 'true'
USE_CHROME = os.getenv('USE_CHROME', 'True' ).lower() == 'true'
CURL_BINARY = os.getenv('CURL_BINARY', 'curl')
GIT_BINARY = os.getenv('GIT_BINARY', 'git')
WGET_BINARY = os.getenv('WGET_BINARY', 'wget')
YOUTUBEDL_BINARY = os.getenv('YOUTUBEDL_BINARY', 'youtube-dl')
CHROME_BINARY = os.getenv('CHROME_BINARY', None)
# ******************************************************************************
### Terminal Configuration
TERM_WIDTH = lambda: shutil.get_terminal_size((100, 10)).columns
ANSI = {
'reset': '\033[00;00m',
'lightblue': '\033[01;30m',
'lightyellow': '\033[01;33m',
'lightred': '\033[01;35m',
'red': '\033[01;31m',
'green': '\033[01;32m',
'blue': '\033[01;34m',
'white': '\033[01;37m',
'black': '\033[01;30m',
}
if not USE_COLOR:
# dont show colors if USE_COLOR is False
ANSI = {k: '' for k in ANSI.keys()}
REPO_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
if OUTPUT_DIR:
OUTPUT_DIR = os.path.abspath(OUTPUT_DIR)
else:
OUTPUT_DIR = os.path.abspath(os.curdir)
ARCHIVE_DIR_NAME = 'archive'
SOURCES_DIR_NAME = 'sources'
ARCHIVE_DIR = os.path.join(OUTPUT_DIR, ARCHIVE_DIR_NAME)
SOURCES_DIR = os.path.join(OUTPUT_DIR, SOURCES_DIR_NAME)
PYTHON_DIR = os.path.join(REPO_DIR, 'archivebox')
TEMPLATES_DIR = os.path.join(PYTHON_DIR, 'templates')
if COOKIES_FILE:
COOKIES_FILE = os.path.abspath(COOKIES_FILE)
URL_BLACKLIST_PTN = re.compile(URL_BLACKLIST, re.IGNORECASE) if URL_BLACKLIST else None
########################### Environment & Dependencies #########################
VERSION = open(os.path.join(PYTHON_DIR, 'VERSION'), 'r').read().strip()
GIT_SHA = VERSION.split('+')[1]
### Check Python environment
python_vers = float('{}.{}'.format(sys.version_info.major, sys.version_info.minor))
if python_vers < 3.5:
print('{}[X] Python version is not new enough: {} (>3.5 is required){}'.format(ANSI['red'], python_vers, ANSI['reset']))
print(' See https://github.com/pirate/ArchiveBox/wiki/Troubleshooting#python for help upgrading your Python installation.')
raise SystemExit(1)
if sys.stdout.encoding.upper() not in ('UTF-8', 'UTF8'):
print('[X] Your system is running python3 scripts with a bad locale setting: {} (it should be UTF-8).'.format(sys.stdout.encoding))
print(' To fix it, add the line "export PYTHONIOENCODING=UTF-8" to your ~/.bashrc file (without quotes)')
print('')
print(' Confirm that it\'s fixed by opening a new shell and running:')
print(' python3 -c "import sys; print(sys.stdout.encoding)" # should output UTF-8')
print('')
print(' Alternatively, run this script with:')
print(' env PYTHONIOENCODING=UTF-8 ./archive.py export.html')
# ******************************************************************************
# ***************************** Helper Functions *******************************
# ******************************************************************************
def bin_version(binary: str) -> str:
"""check the presence and return valid version line of a specified binary"""
if not shutil.which(binary):
print('{red}[X] Missing dependency: wget{reset}'.format(**ANSI))
print(' Install it, then confirm it works with: {} --version'.format(binary))
print(' See https://github.com/pirate/ArchiveBox/wiki/Install for help.')
raise SystemExit(1)
try:
version_str = run([binary, "--version"], stdout=PIPE, cwd=REPO_DIR).stdout.strip().decode()
return version_str.split('\n')[0].strip()
except Exception:
print('{red}[X] Unable to find a working version of {cmd}, is it installed and in your $PATH?'.format(cmd=binary, **ANSI))
raise SystemExit(1)
def find_chrome_binary() -> str:
"""find any installed chrome binaries in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
# make sure data dir finding precedence order always matches binary finding order
default_executable_paths = (
'chromium-browser',
'chromium',
'/Applications/Chromium.app/Contents/MacOS/Chromium',
'google-chrome',
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
'google-chrome-stable',
'google-chrome-beta',
'google-chrome-canary',
'/Applications/Google Chrome Canary.app/Contents/MacOS/Google Chrome Canary',
'google-chrome-unstable',
'google-chrome-dev',
)
for name in default_executable_paths:
full_path_exists = shutil.which(name)
if full_path_exists:
return name
print('{red}[X] Unable to find a working version of Chrome/Chromium, is it installed and in your $PATH?'.format(**ANSI))
raise SystemExit(1)
def find_chrome_data_dir() -> Optional[str]:
"""find any installed chrome user data directories in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
# make sure data dir finding precedence order always matches binary finding order
default_profile_paths = (
'~/.config/chromium',
'~/Library/Application Support/Chromium',
'~/AppData/Local/Chromium/User Data',
'~/.config/google-chrome',
'~/Library/Application Support/Google/Chrome',
'~/AppData/Local/Google/Chrome/User Data',
'~/.config/google-chrome-stable',
'~/.config/google-chrome-beta',
'~/Library/Application Support/Google/Chrome Canary',
'~/AppData/Local/Google/Chrome SxS/User Data',
'~/.config/google-chrome-unstable',
'~/.config/google-chrome-dev',
)
for path in default_profile_paths:
full_path = os.path.expanduser(path)
if os.path.exists(full_path):
return full_path
return None
# ******************************************************************************
# ************************ Environment & Dependencies **************************
# ******************************************************************************
try:
### Make sure curl is installed
if USE_CURL:
USE_CURL = FETCH_FAVICON or SUBMIT_ARCHIVE_DOT_ORG
else:
FETCH_FAVICON = SUBMIT_ARCHIVE_DOT_ORG = False
CURL_VERSION = None
if USE_CURL:
CURL_VERSION = bin_version(CURL_BINARY)
### Make sure wget is installed and calculate version
if USE_WGET:
USE_WGET = FETCH_WGET or FETCH_WARC
else:
FETCH_WGET = FETCH_WARC = False
WGET_VERSION = None
WGET_AUTO_COMPRESSION = False
if USE_WGET:
WGET_VERSION = bin_version(WGET_BINARY)
WGET_AUTO_COMPRESSION = not run([WGET_BINARY, "--compression=auto", "--help"], stdout=DEVNULL, stderr=DEVNULL).returncode
WGET_USER_AGENT = WGET_USER_AGENT.format(
VERSION=VERSION,
WGET_VERSION=WGET_VERSION or '',
)
### Make sure git is installed
GIT_VERSION = None
if FETCH_GIT:
GIT_VERSION = bin_version(GIT_BINARY)
### Make sure youtube-dl is installed
YOUTUBEDL_VERSION = None
if FETCH_MEDIA:
YOUTUBEDL_VERSION = bin_version(YOUTUBEDL_BINARY)
### Make sure chrome is installed and calculate version
if USE_CHROME:
USE_CHROME = FETCH_PDF or FETCH_SCREENSHOT or FETCH_DOM
else:
FETCH_PDF = FETCH_SCREENSHOT = FETCH_DOM = False
if not CHROME_BINARY:
CHROME_BINARY = find_chrome_binary() or 'chromium-browser'
CHROME_VERSION = None
if USE_CHROME:
if CHROME_BINARY:
CHROME_VERSION = bin_version(CHROME_BINARY)
# print('[i] Using Chrome binary: {}'.format(shutil.which(CHROME_BINARY) or CHROME_BINARY))
if CHROME_USER_DATA_DIR is None:
CHROME_USER_DATA_DIR = find_chrome_data_dir()
# print('[i] Using Chrome data dir: {}'.format(os.path.abspath(CHROME_USER_DATA_DIR)))
CHROME_OPTIONS = {
'TIMEOUT': TIMEOUT,
'RESOLUTION': RESOLUTION,
'CHECK_SSL_VALIDITY': CHECK_SSL_VALIDITY,
'CHROME_BINARY': CHROME_BINARY,
'CHROME_HEADLESS': CHROME_HEADLESS,
'CHROME_SANDBOX': CHROME_SANDBOX,
'CHROME_USER_AGENT': CHROME_USER_AGENT,
'CHROME_USER_DATA_DIR': CHROME_USER_DATA_DIR,
}
# PYPPETEER_ARGS = {
# 'headless': CHROME_HEADLESS,
# 'ignoreHTTPSErrors': not CHECK_SSL_VALIDITY,
# # 'executablePath': CHROME_BINARY,
# }
except KeyboardInterrupt:
raise SystemExit(1)
except:
print('[X] There was an error while reading configuration. Your archive data is unaffected.')
raise

View file

@ -1,293 +0,0 @@
import os
import json
from datetime import datetime
from string import Template
from typing import List, Tuple, Iterator, Optional, Mapping
from core.schema import Link, ArchiveResult
from core.config import (
OUTPUT_DIR,
TEMPLATES_DIR,
VERSION,
GIT_SHA,
FOOTER_INFO,
TIMEOUT,
)
from core.util import (
ts_to_date,
merge_links,
urlencode,
htmlencode,
urldecode,
derived_link_info,
wget_output_path,
enforce_types,
TimedProgress,
copy_and_overwrite,
atomic_write,
)
from core.parse import parse_links
from core.links import validate_links
from core.logs import (
log_indexing_process_started,
log_indexing_started,
log_indexing_finished,
log_parsing_started,
log_parsing_finished,
)
TITLE_LOADING_MSG = 'Not yet archived...'
### Homepage index for all the links
@enforce_types
def write_links_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished: bool=False) -> None:
"""create index.html file for a given list of links"""
log_indexing_process_started()
log_indexing_started(out_dir, 'index.json')
timer = TimedProgress(TIMEOUT * 2, prefix=' ')
write_json_links_index(links, out_dir=out_dir)
timer.end()
log_indexing_finished(out_dir, 'index.json')
log_indexing_started(out_dir, 'index.html')
timer = TimedProgress(TIMEOUT * 2, prefix=' ')
write_html_links_index(links, out_dir=out_dir, finished=finished)
timer.end()
log_indexing_finished(out_dir, 'index.html')
@enforce_types
def load_links_index(out_dir: str=OUTPUT_DIR, import_path: Optional[str]=None) -> Tuple[List[Link], List[Link]]:
"""parse and load existing index with any new links from import_path merged in"""
existing_links: List[Link] = []
if out_dir:
existing_links = list(parse_json_links_index(out_dir))
new_links: List[Link] = []
if import_path:
# parse and validate the import file
log_parsing_started(import_path)
raw_links, parser_name = parse_links(import_path)
new_links = list(validate_links(raw_links))
# merge existing links in out_dir and new links
all_links = list(validate_links(existing_links + new_links))
if import_path and parser_name:
num_parsed = len(raw_links)
num_new_links = len(all_links) - len(existing_links)
log_parsing_finished(num_parsed, num_new_links, parser_name)
return all_links, new_links
@enforce_types
def write_json_links_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
"""write the json link index to a given path"""
assert isinstance(links, List), 'Links must be a list, not a generator.'
assert isinstance(links[0].history, dict)
assert isinstance(links[0].sources, list)
if links[0].history.get('title'):
assert isinstance(links[0].history['title'][0], ArchiveResult)
if links[0].sources:
assert isinstance(links[0].sources[0], str)
path = os.path.join(out_dir, 'index.json')
index_json = {
'info': 'ArchiveBox Index',
'source': 'https://github.com/pirate/ArchiveBox',
'docs': 'https://github.com/pirate/ArchiveBox/wiki',
'version': VERSION,
'num_links': len(links),
'updated': datetime.now(),
'links': links,
}
atomic_write(index_json, path)
@enforce_types
def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
"""parse a archive index json file and return the list of links"""
index_path = os.path.join(out_dir, 'index.json')
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
links = json.load(f)['links']
for link_json in links:
yield Link.from_json(link_json)
return ()
@enforce_types
def write_html_links_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished: bool=False) -> None:
"""write the html link index to a given path"""
copy_and_overwrite(
os.path.join(TEMPLATES_DIR, 'static'),
os.path.join(out_dir, 'static'),
)
atomic_write('User-agent: *\nDisallow: /', os.path.join(out_dir, 'robots.txt'))
with open(os.path.join(TEMPLATES_DIR, 'index.html'), 'r', encoding='utf-8') as f:
index_html = f.read()
with open(os.path.join(TEMPLATES_DIR, 'index_row.html'), 'r', encoding='utf-8') as f:
link_row_html = f.read()
link_rows = []
for link in links:
template_row_vars: Mapping[str, str] = {
**derived_link_info(link),
'title': (
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'tags': (link.tags or '') + (' {}'.format(link.extension) if link.is_static else ''),
'favicon_url': (
os.path.join('archive', link.timestamp, 'favicon.ico')
# if link['is_archived'] else 'data:image/gif;base64,R0lGODlhAQABAAD/ACwAAAAAAQABAAACADs='
),
'archive_url': urlencode(
wget_output_path(link) or 'index.html'
),
}
link_rows.append(Template(link_row_html).substitute(**template_row_vars))
template_vars: Mapping[str, str] = {
'num_links': str(len(links)),
'date_updated': datetime.now().strftime('%Y-%m-%d'),
'time_updated': datetime.now().strftime('%Y-%m-%d %H:%M'),
'footer_info': FOOTER_INFO,
'version': VERSION,
'git_sha': GIT_SHA,
'rows': '\n'.join(link_rows),
'status': 'finished' if finished else 'running',
}
template_html = Template(index_html).substitute(**template_vars)
atomic_write(template_html, os.path.join(out_dir, 'index.html'))
@enforce_types
def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
"""hack to in-place update one row's info in the generated index html"""
title = link.title or link.latest_outputs()['title']
successful = link.num_outputs
# Patch JSON index
json_file_links = parse_json_links_index(out_dir)
patched_links = []
for saved_link in json_file_links:
if saved_link.url == link.url:
patched_links.append(saved_link.overwrite(
title=title,
history=link.history,
updated=link.updated,
))
else:
patched_links.append(saved_link)
write_json_links_index(patched_links, out_dir=out_dir)
# Patch HTML index
html_path = os.path.join(out_dir, 'index.html')
html = open(html_path, 'r').read().split('\n')
for idx, line in enumerate(html):
if title and ('<span data-title-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(title)
elif successful and ('<span data-number-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(successful)
break
atomic_write('\n'.join(html), html_path)
### Individual link index
@enforce_types
def write_link_index(link: Link, link_dir: Optional[str]=None) -> None:
link_dir = link_dir or link.link_dir
write_json_link_index(link, link_dir)
write_html_link_index(link, link_dir)
@enforce_types
def write_json_link_index(link: Link, link_dir: Optional[str]=None) -> None:
"""write a json file with some info about the link"""
link_dir = link_dir or link.link_dir
path = os.path.join(link_dir, 'index.json')
atomic_write(link._asdict(), path)
@enforce_types
def parse_json_link_index(link_dir: str) -> Optional[Link]:
"""load the json link index from a given directory"""
existing_index = os.path.join(link_dir, 'index.json')
if os.path.exists(existing_index):
with open(existing_index, 'r', encoding='utf-8') as f:
link_json = json.load(f)
return Link.from_json(link_json)
return None
@enforce_types
def load_json_link_index(link: Link, link_dir: Optional[str]=None) -> Link:
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
link_dir = link_dir or link.link_dir
existing_link = parse_json_link_index(link_dir)
if existing_link:
return merge_links(existing_link, link)
return link
@enforce_types
def write_html_link_index(link: Link, link_dir: Optional[str]=None) -> None:
link_dir = link_dir or link.link_dir
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(link_dir, 'index.html')
template_vars: Mapping[str, str] = {
**derived_link_info(link),
'title': (
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'url_str': htmlencode(urldecode(link.base_url)),
'archive_url': urlencode(
wget_output_path(link)
or (link.domain if link.is_archived else 'about:blank')
),
'extension': link.extension or 'html',
'tags': link.tags or 'untagged',
'status': 'archived' if link.is_archived else 'not yet archived',
'status_color': 'success' if link.is_archived else 'danger',
'oldest_archive_date': ts_to_date(link.oldest_archive_date),
}
html_index = Template(link_html).substitute(**template_vars)
atomic_write(html_index, path)

View file

@ -1,93 +0,0 @@
from typing import Iterable
from collections import OrderedDict
from core.schema import Link
from core.util import (
scheme,
fuzzy_url,
merge_links,
)
from core.config import URL_BLACKLIST_PTN
def validate_links(links: Iterable[Link]) -> Iterable[Link]:
links = archivable_links(links) # remove chrome://, about:, mailto: etc.
links = sorted_links(links) # deterministically sort the links based on timstamp, url
links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
if not links:
print('[X] No links found :(')
raise SystemExit(1)
return links
def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
for link in links:
scheme_is_valid = scheme(link.url) in ('http', 'https', 'ftp')
not_blacklisted = (not URL_BLACKLIST_PTN.match(link.url)) if URL_BLACKLIST_PTN else True
if scheme_is_valid and not_blacklisted:
yield link
def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
"""
ensures that all non-duplicate links have monotonically increasing timestamps
"""
unique_urls: OrderedDict[str, Link] = OrderedDict()
for link in sorted_links:
fuzzy = fuzzy_url(link.url)
if fuzzy in unique_urls:
# merge with any other links that share the same url
link = merge_links(unique_urls[fuzzy], link)
unique_urls[fuzzy] = link
unique_timestamps: OrderedDict[str, Link] = OrderedDict()
for link in unique_urls.values():
new_link = link.overwrite(
timestamp=lowest_uniq_timestamp(unique_timestamps, link.timestamp),
)
unique_timestamps[new_link.timestamp] = new_link
return unique_timestamps.values()
def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
return sorted(links, key=sort_func, reverse=True)
def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
if not resume:
yield from links
return
for link in links:
try:
if float(link.timestamp) <= resume:
yield link
except (ValueError, TypeError):
print('Resume value and all timestamp values must be valid numbers.')
def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
"""resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
timestamp = timestamp.split('.')[0]
nonce = 0
# first try 152323423 before 152323423.0
if timestamp not in used_timestamps:
return timestamp
new_timestamp = '{}.{}'.format(timestamp, nonce)
while new_timestamp in used_timestamps:
nonce += 1
new_timestamp = '{}.{}'.format(timestamp, nonce)
return new_timestamp

View file

@ -1,206 +0,0 @@
import os
import sys
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from core.schema import Link, ArchiveResult
from core.config import ANSI, OUTPUT_DIR
@dataclass
class RuntimeStats:
"""mutable stats counter for logging archiving timing info to CLI output"""
skipped: int = 0
succeeded: int = 0
failed: int = 0
parse_start_ts: Optional[datetime] = None
parse_end_ts: Optional[datetime] = None
index_start_ts: Optional[datetime] = None
index_end_ts: Optional[datetime] = None
archiving_start_ts: Optional[datetime] = None
archiving_end_ts: Optional[datetime] = None
# globals are bad, mmkay
_LAST_RUN_STATS = RuntimeStats()
def pretty_path(path: str) -> str:
"""convert paths like .../ArchiveBox/archivebox/../output/abc into output/abc"""
pwd = os.path.abspath('.')
# parent = os.path.abspath(os.path.join(pwd, os.path.pardir))
return path.replace(pwd + '/', './')
### Parsing Stage
def log_parsing_started(source_file: str):
start_ts = datetime.now()
_LAST_RUN_STATS.parse_start_ts = start_ts
print('{green}[*] [{}] Parsing new links from output/sources/{}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
source_file.rsplit('/', 1)[-1],
**ANSI,
))
def log_parsing_finished(num_parsed: int, num_new_links: int, parser_name: str):
end_ts = datetime.now()
_LAST_RUN_STATS.parse_end_ts = end_ts
print(' > Parsed {} links as {} ({} new links added)'.format(num_parsed, parser_name, num_new_links))
### Indexing Stage
def log_indexing_process_started():
start_ts = datetime.now()
_LAST_RUN_STATS.index_start_ts = start_ts
print('{green}[*] [{}] Saving main index files...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
**ANSI,
))
def log_indexing_started(out_dir: str, out_file: str):
sys.stdout.write(' > {}/{}'.format(pretty_path(out_dir), out_file))
def log_indexing_finished(out_dir: str, out_file: str):
end_ts = datetime.now()
_LAST_RUN_STATS.index_end_ts = end_ts
print('\r{}/{}'.format(pretty_path(out_dir), out_file))
### Archiving Stage
def log_archiving_started(num_links: int, resume: Optional[float]):
start_ts = datetime.now()
_LAST_RUN_STATS.archiving_start_ts = start_ts
if resume:
print('{green}[▶] [{}] Resuming archive updating for {} pages starting from {}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
resume,
**ANSI,
))
else:
print('{green}[▶] [{}] Updating content for {} pages in archive...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
**ANSI,
))
def log_archiving_paused(num_links: int, idx: int, timestamp: str):
end_ts = datetime.now()
_LAST_RUN_STATS.archiving_end_ts = end_ts
print()
print('\n{lightyellow}[X] [{now}] Downloading paused on link {timestamp} ({idx}/{total}){reset}'.format(
**ANSI,
now=end_ts.strftime('%Y-%m-%d %H:%M:%S'),
idx=idx+1,
timestamp=timestamp,
total=num_links,
))
print(' To view your archive, open:')
print(' {}/index.html'.format(OUTPUT_DIR))
print(' Continue archiving where you left off by running:')
print(' archivebox {}'.format(timestamp))
def log_archiving_finished(num_links: int):
end_ts = datetime.now()
_LAST_RUN_STATS.archiving_end_ts = end_ts
assert _LAST_RUN_STATS.archiving_start_ts is not None
seconds = end_ts.timestamp() - _LAST_RUN_STATS.archiving_start_ts.timestamp()
if seconds > 60:
duration = '{0:.2f} min'.format(seconds / 60, 2)
else:
duration = '{0:.2f} sec'.format(seconds, 2)
print('{}[√] [{}] Update of {} pages complete ({}){}'.format(
ANSI['green'],
end_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
duration,
ANSI['reset'],
))
print(' - {} links skipped'.format(_LAST_RUN_STATS.skipped))
print(' - {} links updated'.format(_LAST_RUN_STATS.succeeded))
print(' - {} links had errors'.format(_LAST_RUN_STATS.failed))
print(' To view your archive, open:')
print(' {}/index.html'.format(OUTPUT_DIR))
def log_link_archiving_started(link: Link, link_dir: str, is_new: bool):
# [*] [2019-03-22 13:46:45] "Log Structured Merge Trees - ben stopford"
# http://www.benstopford.com/2015/02/14/log-structured-merge-trees/
# > output/archive/1478739709
print('\n[{symbol_color}{symbol}{reset}] [{symbol_color}{now}{reset}] "{title}"'.format(
symbol_color=ANSI['green' if is_new else 'black'],
symbol='+' if is_new else '',
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
title=link.title or link.base_url,
**ANSI,
))
print(' {blue}{url}{reset}'.format(url=link.url, **ANSI))
print(' {} {}'.format(
'>' if is_new else '',
pretty_path(link_dir),
))
def log_link_archiving_finished(link: Link, link_dir: str, is_new: bool, stats: dict):
total = sum(stats.values())
if stats['failed'] > 0 :
_LAST_RUN_STATS.failed += 1
elif stats['skipped'] == total:
_LAST_RUN_STATS.skipped += 1
else:
_LAST_RUN_STATS.succeeded += 1
def log_archive_method_started(method: str):
print(' > {}'.format(method))
def log_archive_method_finished(result: ArchiveResult):
"""quote the argument with whitespace in a command so the user can
copy-paste the outputted string directly to run the cmd
"""
# Prettify CMD string and make it safe to copy-paste by quoting arguments
quoted_cmd = ' '.join(
'"{}"'.format(arg) if ' ' in arg else arg
for arg in result.cmd
)
if result.status == 'failed':
# Prettify error output hints string and limit to five lines
hints = getattr(result.output, 'hints', None) or ()
if hints:
hints = hints if isinstance(hints, (list, tuple)) else hints.split('\n')
hints = (
' {}{}{}'.format(ANSI['lightyellow'], line.strip(), ANSI['reset'])
for line in hints[:5] if line.strip()
)
# Collect and prefix output lines with indentation
output_lines = [
'{lightred}Failed:{reset}'.format(**ANSI),
' {reset}{} {red}{}{reset}'.format(
result.output.__class__.__name__.replace('ArchiveError', ''),
result.output,
**ANSI,
),
*hints,
'{}Run to see full output:{}'.format(ANSI['lightred'], ANSI['reset']),
*([' cd {};'.format(result.pwd)] if result.pwd else []),
' {}'.format(quoted_cmd),
]
print('\n'.join(
' {}'.format(line)
for line in output_lines
if line
))
print()

View file

@ -1,10 +1,11 @@
from django.core.management.base import BaseCommand
from core.archive import main
from legacy.archive import main
class Command(BaseCommand):
help = 'ArchiveBox test.bee'
def handle(self, *args, **kwargs):
main()
main(*args)

View file

@ -1,331 +0,0 @@
"""
Everything related to parsing links from input sources.
For a list of supported services, see the README.md.
For examples of supported import formats see tests/.
Link: {
'url': 'https://example.com/example/?abc=123&xyc=345#lmnop',
'timestamp': '1544212312.4234',
'title': 'Example.com Page Title',
'tags': 'abc,def',
'sources': [
'output/sources/ril_export.html',
'output/sources/getpocket.com-1523422111.txt',
'output/sources/stdin-234234112312.txt'
]
}
"""
import re
import json
from typing import Tuple, List, IO, Iterable
from datetime import datetime
import xml.etree.ElementTree as etree
from core.config import TIMEOUT
from core.util import (
htmldecode,
str_between,
URL_REGEX,
check_url_parsing_invariants,
TimedProgress,
Link,
enforce_types,
)
@enforce_types
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
check_url_parsing_invariants()
PARSERS = (
# Specialized parsers
('Pocket HTML', parse_pocket_html_export),
('Pinboard RSS', parse_pinboard_rss_export),
('Shaarli RSS', parse_shaarli_rss_export),
('Medium RSS', parse_medium_rss_export),
# General parsers
('Netscape HTML', parse_netscape_html_export),
('Generic RSS', parse_rss_export),
('Generic JSON', parse_json_export),
# Fallback parser
('Plain Text', parse_plain_text_export),
)
timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file:
for parser_name, parser_func in PARSERS:
try:
links = list(parser_func(file))
if links:
timer.end()
return links, parser_name
except Exception as err: # noqa
# Parsers are tried one by one down the list, and the first one
# that succeeds is used. To see why a certain parser was not used
# due to error or format incompatibility, uncomment this line:
# print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
pass
timer.end()
return [], 'Failed to parse'
### Import Parser Functions
@enforce_types
def parse_pocket_html_export(html_file: IO[str]) -> Iterable[Link]:
"""Parse Pocket-format bookmarks export files (produced by getpocket.com/export/)"""
html_file.seek(0)
pattern = re.compile("^\\s*<li><a href=\"(.+)\" time_added=\"(\\d+)\" tags=\"(.*)\">(.+)</a></li>", re.UNICODE)
for line in html_file:
# example line
# <li><a href="http://example.com/ time_added="1478739709" tags="tag1,tag2">example title</a></li>
match = pattern.search(line)
if match:
url = match.group(1).replace('http://www.readability.com/read?url=', '') # remove old readability prefixes to get original url
time = datetime.fromtimestamp(float(match.group(2)))
tags = match.group(3)
title = match.group(4).replace(' — Readability', '').replace('http://www.readability.com/read?url=', '')
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=tags or '',
sources=[html_file.name],
)
@enforce_types
def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
"""Parse JSON-format bookmarks export files (produced by pinboard.in/export/, or wallabag)"""
json_file.seek(0)
links = json.load(json_file)
json_date = lambda s: datetime.strptime(s, '%Y-%m-%dT%H:%M:%S%z')
for link in links:
# example line
# {"href":"http:\/\/www.reddit.com\/r\/example","description":"title here","extended":"","meta":"18a973f09c9cc0608c116967b64e0419","hash":"910293f019c2f4bb1a749fb937ba58e3","time":"2014-06-14T15:51:42Z","shared":"no","toread":"no","tags":"reddit android"}]
if link:
# Parse URL
url = link.get('href') or link.get('url') or link.get('URL')
if not url:
raise Exception('JSON must contain URL in each entry [{"url": "http://...", ...}, ...]')
# Parse the timestamp
ts_str = str(datetime.now().timestamp())
if link.get('timestamp'):
# chrome/ff histories use a very precise timestamp
ts_str = str(link['timestamp'] / 10000000)
elif link.get('time'):
ts_str = str(json_date(link['time'].split(',', 1)[0]).timestamp())
elif link.get('created_at'):
ts_str = str(json_date(link['created_at']).timestamp())
elif link.get('created'):
ts_str = str(json_date(link['created']).timestamp())
elif link.get('date'):
ts_str = str(json_date(link['date']).timestamp())
elif link.get('bookmarked'):
ts_str = str(json_date(link['bookmarked']).timestamp())
elif link.get('saved'):
ts_str = str(json_date(link['saved']).timestamp())
# Parse the title
title = None
if link.get('title'):
title = link['title'].strip()
elif link.get('description'):
title = link['description'].replace(' — Readability', '').strip()
elif link.get('name'):
title = link['name'].strip()
yield Link(
url=htmldecode(url),
timestamp=ts_str,
title=htmldecode(title) or None,
tags=htmldecode(link.get('tags')) or '',
sources=[json_file.name],
)
@enforce_types
def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse RSS XML-format files into links"""
rss_file.seek(0)
items = rss_file.read().split('<item>')
items = items[1:] if items else []
for item in items:
# example item:
# <item>
# <title><![CDATA[How JavaScript works: inside the V8 engine]]></title>
# <category>Unread</category>
# <link>https://blog.sessionstack.com/how-javascript-works-inside</link>
# <guid>https://blog.sessionstack.com/how-javascript-works-inside</guid>
# <pubDate>Mon, 21 Aug 2017 14:21:58 -0500</pubDate>
# </item>
trailing_removed = item.split('</item>', 1)[0]
leading_removed = trailing_removed.split('<item>', 1)[-1].strip()
rows = leading_removed.split('\n')
def get_row(key):
return [r for r in rows if r.strip().startswith('<{}>'.format(key))][0]
url = str_between(get_row('link'), '<link>', '</link>')
ts_str = str_between(get_row('pubDate'), '<pubDate>', '</pubDate>')
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %z")
title = str_between(get_row('title'), '<![CDATA[', ']]').strip()
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=None,
sources=[rss_file.name],
)
@enforce_types
def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Shaarli-specific RSS XML-format files into links"""
rss_file.seek(0)
entries = rss_file.read().split('<entry>')[1:]
for entry in entries:
# example entry:
# <entry>
# <title>Aktuelle Trojaner-Welle: Emotet lauert in gefälschten Rechnungsmails | heise online</title>
# <link href="https://www.heise.de/security/meldung/Aktuelle-Trojaner-Welle-Emotet-lauert-in-gefaelschten-Rechnungsmails-4291268.html" />
# <id>https://demo.shaarli.org/?cEV4vw</id>
# <published>2019-01-30T06:06:01+00:00</published>
# <updated>2019-01-30T06:06:01+00:00</updated>
# <content type="html" xml:lang="en"><![CDATA[<div class="markdown"><p>&#8212; <a href="https://demo.shaarli.org/?cEV4vw">Permalink</a></p></div>]]></content>
# </entry>
trailing_removed = entry.split('</entry>', 1)[0]
leading_removed = trailing_removed.strip()
rows = leading_removed.split('\n')
def get_row(key):
return [r.strip() for r in rows if r.strip().startswith('<{}'.format(key))][0]
title = str_between(get_row('title'), '<title>', '</title>').strip()
url = str_between(get_row('link'), '<link href="', '" />')
ts_str = str_between(get_row('published'), '<published>', '</published>')
time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=None,
sources=[rss_file.name],
)
@enforce_types
def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
"""Parse netscape-format bookmarks export files (produced by all browsers)"""
html_file.seek(0)
pattern = re.compile("<a href=\"(.+?)\" add_date=\"(\\d+)\"[^>]*>(.+)</a>", re.UNICODE | re.IGNORECASE)
for line in html_file:
# example line
# <DT><A HREF="https://example.com/?q=1+2" ADD_DATE="1497562974" LAST_MODIFIED="1497562974" ICON_URI="https://example.com/favicon.ico" ICON="data:image/png;base64,...">example bookmark title</A>
match = pattern.search(line)
if match:
url = match.group(1)
time = datetime.fromtimestamp(float(match.group(2)))
title = match.group(3).strip()
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=None,
sources=[html_file.name],
)
@enforce_types
def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Pinboard RSS feed files into links"""
rss_file.seek(0)
root = etree.parse(rss_file).getroot()
items = root.findall("{http://purl.org/rss/1.0/}item")
for item in items:
find = lambda p: item.find(p).text.strip() if item.find(p) else None # type: ignore
url = find("{http://purl.org/rss/1.0/}link")
tags = find("{http://purl.org/dc/elements/1.1/}subject")
title = find("{http://purl.org/rss/1.0/}title")
ts_str = find("{http://purl.org/dc/elements/1.1/}date")
# Pinboard includes a colon in its date stamp timezone offsets, which
# Python can't parse. Remove it:
if ts_str and ts_str[-3:-2] == ":":
ts_str = ts_str[:-3]+ts_str[-2:]
if ts_str:
time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
else:
time = datetime.now()
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=htmldecode(tags) or None,
sources=[rss_file.name],
)
@enforce_types
def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
"""Parse Medium RSS feed files into links"""
rss_file.seek(0)
root = etree.parse(rss_file).getroot()
items = root.find("channel").findall("item") # type: ignore
for item in items:
url = item.find("link").text # type: ignore
title = item.find("title").text.strip() # type: ignore
ts_str = item.find("pubDate").text # type: ignore
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z") # type: ignore
yield Link(
url=htmldecode(url),
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=None,
sources=[rss_file.name],
)
@enforce_types
def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
"""Parse raw links from each line in a text file"""
text_file.seek(0)
for line in text_file.readlines():
urls = re.findall(URL_REGEX, line) if line.strip() else ()
for url in urls: # type: ignore
yield Link(
url=htmldecode(url),
timestamp=str(datetime.now().timestamp()),
title=None,
tags=None,
sources=[text_file.name],
)

View file

@ -1,85 +0,0 @@
#!/usr/bin/env python3
import re
from argparse import ArgumentParser
from os.path import exists, join
from shutil import rmtree
from typing import List
from core.config import ARCHIVE_DIR, OUTPUT_DIR
from core.index import parse_json_links_index, write_html_links_index, write_json_links_index
def cleanup_index(regexes: List[str], proceed: bool, delete: bool) -> None:
if not exists(join(OUTPUT_DIR, 'index.json')):
exit('index.json is missing; nothing to do')
compiled = [re.compile(r) for r in regexes]
links = parse_json_links_index(OUTPUT_DIR)
filtered = []
remaining = []
for link in links:
url = link.url
for r in compiled:
if r.search(url):
filtered.append((link, r))
break
else:
remaining.append(link)
if not filtered:
exit('Search did not match any entries.')
print('Filtered out {}/{} urls:'.format(len(filtered), len(links)))
for link, regex in filtered:
url = link.url
print(' {url} via {regex}'.format(url=url, regex=regex.pattern))
if not proceed:
answer = input('Remove {} entries from index? [y/n] '.format(
len(filtered)))
proceed = answer.strip().lower() in ('y', 'yes')
if not proceed:
exit('Aborted')
write_json_links_index(OUTPUT_DIR, remaining)
write_html_links_index(OUTPUT_DIR, remaining)
if delete:
for link, _ in filtered:
data_dir = join(ARCHIVE_DIR, link['timestamp'])
if exists(data_dir):
rmtree(data_dir)
if __name__ == '__main__':
p = ArgumentParser('Index purging tool')
p.add_argument(
'--regex',
'-r',
action='append',
help='Regular expression matching URLs to purge',
)
p.add_argument(
'--delete',
'-d',
action='store_true',
default=False,
help='Delete webpage files from archive',
)
p.add_argument(
'--yes',
'-y',
action='store_true',
default=False,
help='Do not prompt for confirmation',
)
args = p.parse_args()
if args.regex:
cleanup_index(args.regex, proceed=args.yes, delete=args.delete)
else:
p.print_help()

View file

@ -1,318 +0,0 @@
import os
from datetime import datetime
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass, asdict, field, fields
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
LinkDict = Dict[str, Any]
ArchiveOutput = Union[str, Exception, None]
@dataclass(frozen=True)
class ArchiveResult:
cmd: List[str]
pwd: Optional[str]
cmd_version: Optional[str]
output: ArchiveOutput
status: str
start_ts: datetime
end_ts: datetime
schema: str = 'ArchiveResult'
def __post_init__(self):
self.typecheck()
def _asdict(self):
return asdict(self)
def typecheck(self) -> None:
assert self.schema == self.__class__.__name__
assert isinstance(self.status, str) and self.status
assert isinstance(self.start_ts, datetime)
assert isinstance(self.end_ts, datetime)
assert isinstance(self.cmd, list)
assert all(isinstance(arg, str) and arg for arg in self.cmd)
assert self.pwd is None or isinstance(self.pwd, str) and self.pwd
assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version
assert self.output is None or isinstance(self.output, (str, Exception))
if isinstance(self.output, str):
assert self.output
@classmethod
def from_json(cls, json_info):
from .util import parse_date
allowed_fields = {f.name for f in fields(cls)}
info = {
key: val
for key, val in json_info.items()
if key in allowed_fields
}
info['start_ts'] = parse_date(info['start_ts'])
info['end_ts'] = parse_date(info['end_ts'])
return cls(**info)
@property
def duration(self) -> int:
return (self.end_ts - self.start_ts).seconds
@dataclass(frozen=True)
class Link:
timestamp: str
url: str
title: Optional[str]
tags: Optional[str]
sources: List[str]
history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
updated: Optional[datetime] = None
schema: str = 'Link'
def __post_init__(self):
self.typecheck()
def overwrite(self, **kwargs):
"""pure functional version of dict.update that returns a new instance"""
return Link(**{**self._asdict(), **kwargs})
def __eq__(self, other):
if not isinstance(other, Link):
return NotImplemented
return self.url == other.url
def __gt__(self, other):
if not isinstance(other, Link):
return NotImplemented
if not self.timestamp or not other.timestamp:
return
return float(self.timestamp) > float(other.timestamp)
def typecheck(self) -> None:
assert self.schema == self.__class__.__name__
assert isinstance(self.timestamp, str) and self.timestamp
assert self.timestamp.replace('.', '').isdigit()
assert isinstance(self.url, str) and '://' in self.url
assert self.updated is None or isinstance(self.updated, datetime)
assert self.title is None or isinstance(self.title, str) and self.title
assert self.tags is None or isinstance(self.tags, str) and self.tags
assert isinstance(self.sources, list)
assert all(isinstance(source, str) and source for source in self.sources)
assert isinstance(self.history, dict)
for method, results in self.history.items():
assert isinstance(method, str) and method
assert isinstance(results, list)
assert all(isinstance(result, ArchiveResult) for result in results)
def _asdict(self, extended=False):
info = {
'schema': 'Link',
'url': self.url,
'title': self.title or None,
'timestamp': self.timestamp,
'updated': self.updated or None,
'tags': self.tags or None,
'sources': self.sources or [],
'history': self.history or {},
}
if extended:
info.update({
'link_dir': self.link_dir,
'archive_path': self.archive_path,
'bookmarked_date': self.bookmarked_date,
'updated_date': self.updated_date,
'domain': self.domain,
'path': self.path,
'basename': self.basename,
'extension': self.extension,
'base_url': self.base_url,
'is_static': self.is_static,
'is_archived': self.is_archived,
'num_outputs': self.num_outputs,
'num_failures': self.num_failures,
'oldest_archive_date': self.oldest_archive_date,
'newest_archive_date': self.newest_archive_date,
})
return info
@classmethod
def from_json(cls, json_info):
from .util import parse_date
allowed_fields = {f.name for f in fields(cls)}
info = {
key: val
for key, val in json_info.items()
if key in allowed_fields
}
info['updated'] = parse_date(info['updated'])
json_history = info['history']
cast_history = {}
for method, method_history in json_history.items():
cast_history[method] = []
for json_result in method_history:
assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
cast_result = ArchiveResult.from_json(json_result)
cast_history[method].append(cast_result)
info['history'] = cast_history
return cls(**info)
@property
def link_dir(self) -> str:
from .config import ARCHIVE_DIR
return os.path.join(ARCHIVE_DIR, self.timestamp)
@property
def archive_path(self) -> str:
from .config import ARCHIVE_DIR_NAME
return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
### URL Helpers
@property
def urlhash(self):
from .util import hashurl
return hashurl(self.url)
@property
def extension(self) -> str:
from .util import extension
return extension(self.url)
@property
def domain(self) -> str:
from .util import domain
return domain(self.url)
@property
def path(self) -> str:
from .util import path
return path(self.url)
@property
def basename(self) -> str:
from .util import basename
return basename(self.url)
@property
def base_url(self) -> str:
from .util import base_url
return base_url(self.url)
### Pretty Printing Helpers
@property
def bookmarked_date(self) -> Optional[str]:
from .util import ts_to_date
return ts_to_date(self.timestamp) if self.timestamp else None
@property
def updated_date(self) -> Optional[str]:
from .util import ts_to_date
return ts_to_date(self.updated) if self.updated else None
@property
def archive_dates(self) -> List[datetime]:
return [
result.start_ts
for method in self.history.keys()
for result in self.history[method]
]
@property
def oldest_archive_date(self) -> Optional[datetime]:
return min(self.archive_dates, default=None)
@property
def newest_archive_date(self) -> Optional[datetime]:
return max(self.archive_dates, default=None)
### Archive Status Helpers
@property
def num_outputs(self) -> int:
return len(tuple(filter(None, self.latest_outputs().values())))
@property
def num_failures(self) -> int:
return sum(1
for method in self.history.keys()
for result in self.history[method]
if result.status == 'failed')
@property
def is_static(self) -> bool:
from .util import is_static_file
return is_static_file(self.url)
@property
def is_archived(self) -> bool:
from .config import ARCHIVE_DIR
from .util import domain
return os.path.exists(os.path.join(
ARCHIVE_DIR,
self.timestamp,
domain(self.url),
))
def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]:
"""get the latest output that each archive method produced for link"""
ARCHIVE_METHODS = (
'title', 'favicon', 'wget', 'warc', 'pdf',
'screenshot', 'dom', 'git', 'media', 'archive_org',
)
latest: Dict[str, ArchiveOutput] = {}
for archive_method in ARCHIVE_METHODS:
# get most recent succesful result in history for each archive method
history = self.history.get(archive_method) or []
history = list(filter(lambda result: result.output, reversed(history)))
if status is not None:
history = list(filter(lambda result: result.status == status, history))
history = list(history)
if history:
latest[archive_method] = history[0].output
else:
latest[archive_method] = None
return latest
def canonical_outputs(self) -> Dict[str, Optional[str]]:
from .util import wget_output_path
canonical = {
'index_url': 'index.html',
'favicon_url': 'favicon.ico',
'google_favicon_url': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
'archive_url': wget_output_path(self),
'warc_url': 'warc',
'pdf_url': 'output.pdf',
'screenshot_url': 'screenshot.png',
'dom_url': 'output.html',
'archive_org_url': 'https://web.archive.org/web/{}'.format(self.base_url),
'git_url': 'git',
'media_url': 'media',
}
if self.is_static:
# static binary files like PDF and images are handled slightly differently.
# they're just downloaded once and aren't archived separately multiple times,
# so the wget, screenshot, & pdf urls should all point to the same file
static_url = wget_output_path(self)
canonical.update({
'title': self.basename,
'archive_url': static_url,
'pdf_url': static_url,
'screenshot_url': static_url,
'dom_url': static_url,
})
return canonical

View file

@ -0,0 +1,78 @@
import os
from legacy.config import (
REPO_DIR,
OUTPUT_DIR,
TEMPLATES_DIR,
DATABASE_DIR,
)
SECRET_KEY = '---------------- not a valid secret key ! ----------------'
DEBUG = True
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'core',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'core.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [TEMPLATES_DIR],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'core.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(DATABASE_DIR, 'database.sqlite3'),
}
}
AUTH_PASSWORD_VALIDATORS = [
{'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
{'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'},
{'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
{'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
STATIC_URL = '/static/'

21
archivebox/core/urls.py Normal file
View file

@ -0,0 +1,21 @@
"""archivebox URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/2.1/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
urlpatterns = [
path('admin/', admin.site.urls),
]

View file

@ -1,707 +0,0 @@
import os
import re
import sys
import json
import time
import shutil
from json import JSONEncoder
from typing import List, Optional, Any, Union
from inspect import signature
from functools import wraps
from hashlib import sha256
from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
from datetime import datetime
from multiprocessing import Process
from subprocess import (
Popen,
PIPE,
DEVNULL,
CompletedProcess,
TimeoutExpired,
CalledProcessError,
)
from base32_crockford import encode as base32_encode # type: ignore
from core.schema import Link
from core.config import (
ANSI,
TERM_WIDTH,
SOURCES_DIR,
OUTPUT_PERMISSIONS,
TIMEOUT,
SHOW_PROGRESS,
FETCH_TITLE,
CHECK_SSL_VALIDITY,
WGET_USER_AGENT,
CHROME_OPTIONS,
PYTHON_DIR,
)
from core.logs import pretty_path
### Parsing Helpers
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
scheme = lambda url: urlparse(url).scheme.lower()
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
path = lambda url: urlparse(url).path
basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
domain = lambda url: urlparse(url).netloc
query = lambda url: urlparse(url).query
fragment = lambda url: urlparse(url).fragment
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
fuzzy_url = lambda url: without_trailing_slash(without_www(without_scheme(url.lower())))
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols
re.IGNORECASE,
)
HTML_TITLE_REGEX = re.compile(
r'<title.*?>' # start matching text after <title> tag
r'(.[^<>]+)', # get everything up to these symbols
re.IGNORECASE | re.MULTILINE | re.DOTALL | re.UNICODE,
)
STATICFILE_EXTENSIONS = {
# 99.999% of the time, URLs ending in these extentions are static files
# that can be downloaded as-is, not html pages that need to be rendered
'gif', 'jpeg', 'jpg', 'png', 'tif', 'tiff', 'wbmp', 'ico', 'jng', 'bmp',
'svg', 'svgz', 'webp', 'ps', 'eps', 'ai',
'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v',
'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8',
'pdf', 'txt', 'rtf', 'rtfd', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx',
'atom', 'rss', 'css', 'js', 'json',
'dmg', 'iso', 'img',
'rar', 'war', 'hqx', 'zip', 'gz', 'bz2', '7z',
# Less common extensions to consider adding later
# jar, swf, bin, com, exe, dll, deb
# ear, hqx, eot, wmlc, kml, kmz, cco, jardiff, jnlp, run, msi, msp, msm,
# pl pm, prc pdb, rar, rpm, sea, sit, tcl tk, der, pem, crt, xpi, xspf,
# ra, mng, asx, asf, 3gpp, 3gp, mid, midi, kar, jad, wml, htc, mml
# Thse are always treated as pages, not as static files, never add them:
# html, htm, shtml, xhtml, xml, aspx, php, cgi
}
### Checks & Tests
def enforce_types(func):
"""
Enforce function arg and kwarg types at runtime using its python3 type hints
"""
# TODO: check return type as well
@wraps(func)
def typechecked_function(*args, **kwargs):
sig = signature(func)
def check_argument_type(arg_key, arg_val):
try:
annotation = sig.parameters[arg_key].annotation
except KeyError:
annotation = None
if annotation is not None and annotation.__class__ is type:
if not isinstance(arg_val, annotation):
raise TypeError(
'{}(..., {}: {}) got unexpected {} argument {}={}'.format(
func.__name__,
arg_key,
annotation.__name__,
type(arg_val).__name__,
arg_key,
str(arg_val)[:64],
)
)
# check args
for arg_val, arg_key in zip(args, sig.parameters):
check_argument_type(arg_key, arg_val)
# check kwargs
for arg_key, arg_val in kwargs.items():
check_argument_type(arg_key, arg_val)
return func(*args, **kwargs)
return typechecked_function
def check_url_parsing_invariants() -> None:
"""Check that plain text regex URL parsing works as expected"""
# this is last-line-of-defense to make sure the URL_REGEX isn't
# misbehaving, as the consequences could be disastrous and lead to many
# incorrect/badly parsed links being added to the archive
test_urls = '''
https://example1.com/what/is/happening.html?what=1#how-about-this=1
https://example2.com/what/is/happening/?what=1#how-about-this=1
HTtpS://example3.com/what/is/happening/?what=1#how-about-this=1f
https://example4.com/what/is/happening.html
https://example5.com/
https://example6.com
<test>http://example7.com</test>
[https://example8.com/what/is/this.php?what=1]
[and http://example9.com?what=1&other=3#and-thing=2]
<what>https://example10.com#and-thing=2 "</about>
abc<this["https://example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def
sdflkf[what](https://example12.com/who/what.php?whoami=1#whatami=2)?am=hi
example13.bada
and example14.badb
<or>htt://example15.badc</that>
'''
# print('\n'.join(re.findall(URL_REGEX, test_urls)))
assert len(re.findall(URL_REGEX, test_urls)) == 12
### Random Helpers
@enforce_types
def handle_stdin_import(raw_text: str) -> str:
if not os.path.exists(SOURCES_DIR):
os.makedirs(SOURCES_DIR)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(SOURCES_DIR, '{}-{}.txt'.format('stdin', ts))
atomic_write(raw_text, source_path)
return source_path
@enforce_types
def handle_file_import(path: str, timeout: int=TIMEOUT) -> str:
"""download a given url's content into output/sources/domain-<timestamp>.txt"""
if not os.path.exists(SOURCES_DIR):
os.makedirs(SOURCES_DIR)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(SOURCES_DIR, '{}-{}.txt'.format(basename(path), ts))
if any(path.startswith(s) for s in ('http://', 'https://', 'ftp://')):
source_path = os.path.join(SOURCES_DIR, '{}-{}.txt'.format(domain(path), ts))
print('{}[*] [{}] Downloading {}{}'.format(
ANSI['green'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
path,
ANSI['reset'],
))
timer = TimedProgress(timeout, prefix=' ')
try:
raw_source_text = download_url(path, timeout=timeout)
timer.end()
except Exception as e:
timer.end()
print('{}[!] Failed to download {}{}\n'.format(
ANSI['red'],
path,
ANSI['reset'],
))
print(' ', e)
raise SystemExit(1)
else:
with open(path, 'r') as f:
raw_source_text = f.read()
atomic_write(raw_source_text, source_path)
print(' > {}'.format(pretty_path(source_path)))
return source_path
@enforce_types
def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) -> Optional[str]:
"""Attempt to guess a page's title by downloading the html"""
if not FETCH_TITLE:
return None
try:
html = download_url(url, timeout=timeout)
match = re.search(HTML_TITLE_REGEX, html)
return htmldecode(match.group(1).strip()) if match else None
except Exception as err: # noqa
# print('[!] Failed to fetch title because of {}: {}'.format(
# err.__class__.__name__,
# err,
# ))
return None
@enforce_types
def wget_output_path(link: Link) -> Optional[str]:
"""calculate the path to the wgetted .html file, since wget may
adjust some paths to be different than the base_url path.
See docs on wget --adjust-extension (-E)
"""
if is_static_file(link.url):
return without_scheme(without_fragment(link.url))
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
# > output/archive/<timestamp>/example.com/index.html
# https://example.com?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/index.html?v=zzVa_tX1OiI.html
# https://www.example.com/?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc
# > output/archive/<timestamp>/example.com/abc.html
# https://example.com/abc/
# > output/archive/<timestamp>/example.com/abc/index.html
# https://example.com/abc?v=zzVa_tX1OiI.html
# > output/archive/<timestamp>/example.com/abc?v=zzVa_tX1OiI.html
# https://example.com/abc/?v=zzVa_tX1OiI.html
# > output/archive/<timestamp>/example.com/abc/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc/test.html
# > output/archive/<timestamp>/example.com/abc/test.html
# https://example.com/abc/test?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/abc/test?v=zzVa_tX1OiI.html
# https://example.com/abc/test/?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/abc/test/index.html?v=zzVa_tX1OiI.html
# There's also lots of complexity around how the urlencoding and renaming
# is done for pages with query and hash fragments or extensions like shtml / htm / php / etc
# Since the wget algorithm for -E (appending .html) is incredibly complex
# and there's no way to get the computed output path from wget
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
link.link_dir,
domain(link.url),
urldecode(full_path),
)
for _ in range(4):
if os.path.exists(search_dir):
if os.path.isdir(search_dir):
html_files = [
f for f in os.listdir(search_dir)
if re.search(".+\\.[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
]
if html_files:
path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
if search_dir == link.link_dir:
break
return None
@enforce_types
def read_js_script(script_name: str) -> str:
script_path = os.path.join(PYTHON_DIR, 'scripts', script_name)
with open(script_path, 'r') as f:
return f.read().split('// INFO BELOW HERE')[0].strip()
### String Manipulation & Logging Helpers
@enforce_types
def str_between(string: str, start: str, end: str=None) -> str:
"""(<abc>12345</def>, <abc>, </def>) -> 12345"""
content = string.split(start, 1)[-1]
if end is not None:
content = content.rsplit(end, 1)[0]
return content
@enforce_types
def parse_date(date: Any) -> Optional[datetime]:
"""Parse unix timestamps, iso format, and human-readable strings"""
if date is None:
return None
if isinstance(date, datetime):
return date
if isinstance(date, (float, int)):
date = str(date)
if isinstance(date, str):
if date.replace('.', '').isdigit():
# this is a brittle attempt at unix timestamp parsing (which is
# notoriously hard to do). It may lead to dates being off by
# anything from hours to decades, depending on which app, OS,
# and sytem time configuration was used for the original timestamp
# more info: https://github.com/pirate/ArchiveBox/issues/119
# Note: always always always store the original timestamp string
# somewhere indepentendly of the parsed datetime, so that later
# bugs dont repeatedly misparse and rewrite increasingly worse dates.
# the correct date can always be re-derived from the timestamp str
timestamp = float(date)
EARLIEST_POSSIBLE = 473403600.0 # 1985
LATEST_POSSIBLE = 1735707600.0 # 2025
if EARLIEST_POSSIBLE < timestamp < LATEST_POSSIBLE:
# number is seconds
return datetime.fromtimestamp(timestamp)
elif EARLIEST_POSSIBLE * 1000 < timestamp < LATEST_POSSIBLE * 1000:
# number is milliseconds
return datetime.fromtimestamp(timestamp / 1000)
elif EARLIEST_POSSIBLE * 1000*1000 < timestamp < LATEST_POSSIBLE * 1000*1000:
# number is microseconds
return datetime.fromtimestamp(timestamp / (1000*1000))
else:
# continue to the end and raise a parsing failed error.
# we dont want to even attempt parsing timestamp strings that
# arent within these ranges
pass
if '-' in date:
try:
return datetime.fromisoformat(date)
except Exception:
try:
return datetime.strptime(date, '%Y-%m-%d %H:%M')
except Exception:
pass
raise ValueError('Tried to parse invalid date! {}'.format(date))
### Link Helpers
@enforce_types
def merge_links(a: Link, b: Link) -> Link:
"""deterministially merge two links, favoring longer field values over shorter,
and "cleaner" values over worse ones.
"""
assert a.base_url == b.base_url, 'Cannot merge two links with different URLs'
url = a.url if len(a.url) > len(b.url) else b.url
possible_titles = [
title
for title in (a.title, b.title)
if title and title.strip() and '://' not in title
]
title = None
if len(possible_titles) == 2:
title = max(possible_titles, key=lambda t: len(t))
elif len(possible_titles) == 1:
title = possible_titles[0]
timestamp = (
a.timestamp
if float(a.timestamp or 0) < float(b.timestamp or 0) else
b.timestamp
)
tags_set = (
set(tag.strip() for tag in (a.tags or '').split(','))
| set(tag.strip() for tag in (b.tags or '').split(','))
)
tags = ','.join(tags_set) or None
sources = list(set(a.sources + b.sources))
all_methods = set(list(a.history.keys()) + list(a.history.keys()))
history = {
method: (a.history.get(method) or []) + (b.history.get(method) or [])
for method in all_methods
}
return Link(
url=url,
timestamp=timestamp,
title=title,
tags=tags,
sources=sources,
history=history,
)
@enforce_types
def is_static_file(url: str) -> bool:
"""Certain URLs just point to a single static file, and
don't need to be re-archived in many formats
"""
# TODO: the proper way is with MIME type detection, not using extension
return extension(url) in STATICFILE_EXTENSIONS
@enforce_types
def derived_link_info(link: Link) -> dict:
"""extend link info with the archive urls and other derived data"""
info = link._asdict(extended=True)
info.update(link.canonical_outputs())
return info
### Python / System Helpers
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
"""Patched of subprocess.run to fix blocking io making timeout=innefective"""
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('stdout and stderr arguments may not be used '
'with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
try:
stdout, stderr = process.communicate(input, timeout=2)
except:
pass
raise TimeoutExpired(popenargs[0][0], timeout)
except BaseException:
process.kill()
# We don't call process.wait() as .__exit__ does that for us.
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
class TimedProgress:
"""Show a progress bar and measure elapsed time until .end() is called"""
def __init__(self, seconds, prefix=''):
if SHOW_PROGRESS:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {'start_ts': datetime.now(), 'end_ts': None}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats['end_ts'] = end_ts
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):
# return
if self.p is not None:
self.p.terminate()
self.p = None
sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH()), ANSI['reset'])) # clear whole terminal line
@enforce_types
def progress_bar(seconds: int, prefix: str='') -> None:
"""show timer in the form of progress bar, with percentage and seconds remaining"""
chunk = '' if sys.stdout.encoding == 'UTF-8' else '#'
chunks = TERM_WIDTH() - len(prefix) - 20 # number of progress chunks to show (aka max bar width)
try:
for s in range(seconds * chunks):
chunks = TERM_WIDTH() - len(prefix) - 20
progress = s / chunks / seconds * 100
bar_width = round(progress/(100/chunks))
# ████████████████████ 0.9% (1/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)'.format(
prefix,
ANSI['green'],
(chunk * bar_width).ljust(chunks),
ANSI['reset'],
round(progress, 1),
round(s/chunks),
seconds,
))
sys.stdout.flush()
time.sleep(1 / chunks)
# ██████████████████████████████████ 100.0% (60/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)\n'.format(
prefix,
ANSI['red'],
chunk * chunks,
ANSI['reset'],
100.0,
seconds,
seconds,
))
sys.stdout.flush()
except KeyboardInterrupt:
print()
pass
@enforce_types
def download_url(url: str, timeout: int=TIMEOUT) -> str:
"""Download the contents of a remote url and return the text"""
req = Request(url, headers={'User-Agent': WGET_USER_AGENT})
if CHECK_SSL_VALIDITY:
resp = urlopen(req, timeout=timeout)
else:
import ssl
insecure = ssl._create_unverified_context()
resp = urlopen(req, timeout=timeout, context=insecure)
encoding = resp.headers.get_content_charset() or 'utf-8' # type: ignore
return resp.read().decode(encoding)
@enforce_types
def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None:
"""chmod -R <permissions> <cwd>/<path>"""
if not os.path.exists(os.path.join(cwd, path)):
raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
chmod_result = run(['chmod', '-R', permissions, path], cwd=cwd, stdout=DEVNULL, stderr=PIPE, timeout=timeout)
if chmod_result.returncode == 1:
print(' ', chmod_result.stderr.decode())
raise Exception('Failed to chmod {}/{}'.format(cwd, path))
@enforce_types
def copy_and_overwrite(from_path: str, to_path: str):
if os.path.exists(to_path):
shutil.rmtree(to_path)
shutil.copytree(from_path, to_path)
@enforce_types
def chrome_args(**options) -> List[str]:
"""helper to build up a chrome shell command with arguments"""
options = {**CHROME_OPTIONS, **options}
cmd_args = [options['CHROME_BINARY']]
if options['CHROME_HEADLESS']:
cmd_args += ('--headless',)
if not options['CHROME_SANDBOX']:
# dont use GPU or sandbox when running inside docker container
cmd_args += ('--no-sandbox', '--disable-gpu')
if not options['CHECK_SSL_VALIDITY']:
cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
if options['CHROME_USER_AGENT']:
cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
if options['RESOLUTION']:
cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
if options['TIMEOUT']:
cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
if options['CHROME_USER_DATA_DIR']:
cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
return cmd_args
class ExtendedEncoder(JSONEncoder):
"""
Extended json serializer that supports serializing several model
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj)
return JSONEncoder.default(self, obj)
def atomic_write(contents: Union[dict, str], path: str) -> None:
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
try:
tmp_file = '{}.tmp'.format(path)
with open(tmp_file, 'w+', encoding='utf-8') as f:
if isinstance(contents, dict):
json.dump(contents, f, indent=4, cls=ExtendedEncoder)
else:
f.write(contents)
os.fsync(f.fileno())
os.rename(tmp_file, path)
chmod_file(path)
finally:
if os.path.exists(tmp_file):
os.remove(tmp_file)

16
archivebox/core/wsgi.py Normal file
View file

@ -0,0 +1,16 @@
"""
WSGI config for archivebox project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/2.1/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings')
application = get_wsgi_application()