split up utils into separate files

This commit is contained in:
Nick Sweeting 2019-04-30 23:13:04 -04:00
parent daf5951897
commit 95007d9137
23 changed files with 820 additions and 759 deletions

View file

@ -1,14 +1,28 @@
__package__ = 'archivebox.cli' __package__ = 'archivebox.cli'
import re
import os import os
import sys import sys
import time
import argparse
from datetime import datetime from datetime import datetime
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, List from multiprocessing import Process
from typing import Optional, List, Dict, Union, IO
from ..index.schema import Link, ArchiveResult from ..index.schema import Link, ArchiveResult
from ..config import ANSI, OUTPUT_DIR, IS_TTY from ..index.json import to_json
from ..index.csv import links_to_csv
from ..util import enforce_types
from ..config import (
ConfigDict,
ANSI,
OUTPUT_DIR,
IS_TTY,
SHOW_PROGRESS,
TERM_WIDTH,
)
@dataclass @dataclass
@ -32,11 +46,104 @@ class RuntimeStats:
_LAST_RUN_STATS = RuntimeStats() _LAST_RUN_STATS = RuntimeStats()
def pretty_path(path: str) -> str:
"""convert paths like .../ArchiveBox/archivebox/../output/abc into output/abc""" class SmartFormatter(argparse.HelpFormatter):
pwd = os.path.abspath('.') """Patched formatter that prints newlines in argparse help strings"""
# parent = os.path.abspath(os.path.join(pwd, os.path.pardir)) def _split_lines(self, text, width):
return path.replace(pwd + '/', './') if '\n' in text:
return text.splitlines()
return argparse.HelpFormatter._split_lines(self, text, width)
def reject_stdin(caller: str, stdin: Optional[IO]=sys.stdin) -> None:
"""Tell the user they passed stdin to a command that doesn't accept it"""
if stdin and not stdin.isatty():
stdin_raw_text = stdin.read().strip()
if stdin_raw_text:
print(
'{red}[X] The "{}" command does not accept stdin.{reset}\n'.format(
caller,
**ANSI,
)
)
print(' Run archivebox "{} --help" to see usage and examples.'.format(
caller,
))
print()
raise SystemExit(1)
def accept_stdin(stdin: Optional[IO]=sys.stdin) -> Optional[str]:
if stdin and not stdin.isatty():
return stdin.read()
return None
class TimedProgress:
"""Show a progress bar and measure elapsed time until .end() is called"""
def __init__(self, seconds, prefix=''):
if SHOW_PROGRESS:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {'start_ts': datetime.now(), 'end_ts': None}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats['end_ts'] = end_ts
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):
# return
if self.p is not None:
self.p.terminate()
self.p = None
sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH()), ANSI['reset'])) # clear whole terminal line
@enforce_types
def progress_bar(seconds: int, prefix: str='') -> None:
"""show timer in the form of progress bar, with percentage and seconds remaining"""
chunk = '' if sys.stdout.encoding == 'UTF-8' else '#'
chunks = TERM_WIDTH() - len(prefix) - 20 # number of progress chunks to show (aka max bar width)
try:
for s in range(seconds * chunks):
chunks = TERM_WIDTH() - len(prefix) - 20
progress = s / chunks / seconds * 100
bar_width = round(progress/(100/chunks))
# ████████████████████ 0.9% (1/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)'.format(
prefix,
ANSI['green'],
(chunk * bar_width).ljust(chunks),
ANSI['reset'],
round(progress, 1),
round(s/chunks),
seconds,
))
sys.stdout.flush()
time.sleep(1 / chunks)
# ██████████████████████████████████ 100.0% (60/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)\n'.format(
prefix,
ANSI['red'],
chunk * chunks,
ANSI['reset'],
100.0,
seconds,
seconds,
))
sys.stdout.flush()
except KeyboardInterrupt:
print()
pass
### Parsing Stage ### Parsing Stage
@ -223,10 +330,9 @@ def log_list_started(filter_patterns: Optional[List[str]], filter_type: str):
print(' {}'.format(' '.join(filter_patterns or ()))) print(' {}'.format(' '.join(filter_patterns or ())))
def log_list_finished(links): def log_list_finished(links):
from ..util import links_to_csv
print() print()
print('---------------------------------------------------------------------------------------------------') print('---------------------------------------------------------------------------------------------------')
print(links_to_csv(links, csv_cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | ')) print(links_to_csv(links, cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
print('---------------------------------------------------------------------------------------------------') print('---------------------------------------------------------------------------------------------------')
print() print()
@ -266,3 +372,129 @@ def log_removal_finished(all_links: int, to_keep: int):
**ANSI, **ANSI,
)) ))
print(' Index now contains {} links.'.format(to_keep)) print(' Index now contains {} links.'.format(to_keep))
def log_shell_welcome_msg():
from . import list_subcommands
print('{green}# ArchiveBox Imports{reset}'.format(**ANSI))
print('{green}from archivebox.core.models import Page, User{reset}'.format(**ANSI))
print('{green}from archivebox import *\n {}{reset}'.format("\n ".join(list_subcommands().keys()), **ANSI))
print()
print('[i] Welcome to the ArchiveBox Shell!')
print(' https://github.com/pirate/ArchiveBox/wiki/Usage#Shell-Usage')
print()
print(' {lightred}Hint:{reset} Example use:'.format(**ANSI))
print(' print(Page.objects.filter(is_archived=True).count())')
print(' Page.objects.get(url="https://example.com").as_json()')
print(' add("https://example.com/some/new/url")')
### Helpers
@enforce_types
def pretty_path(path: str) -> str:
"""convert paths like .../ArchiveBox/archivebox/../output/abc into output/abc"""
pwd = os.path.abspath('.')
# parent = os.path.abspath(os.path.join(pwd, os.path.pardir))
return path.replace(pwd + '/', './')
@enforce_types
def printable_filesize(num_bytes: Union[int, float]) -> str:
for count in ['Bytes','KB','MB','GB']:
if num_bytes > -1024.0 and num_bytes < 1024.0:
return '%3.1f %s' % (num_bytes, count)
num_bytes /= 1024.0
return '%3.1f %s' % (num_bytes, 'TB')
@enforce_types
def printable_folders(folders: Dict[str, Optional[Link]],
json: bool=False,
csv: Optional[str]=None) -> str:
if json:
return to_json(folders.values(), indent=4, sort_keys=True)
elif csv:
return links_to_csv(folders.values(), cols=csv.split(','), header=True)
return '\n'.join(f'{folder} {link}' for folder, link in folders.items())
@enforce_types
def printable_config(config: ConfigDict, prefix: str='') -> str:
return f'\n{prefix}'.join(
f'{key}={val}'
for key, val in config.items()
if not (isinstance(val, dict) or callable(val))
)
@enforce_types
def printable_folder_status(name: str, folder: Dict) -> str:
if folder['enabled']:
if folder['is_valid']:
color, symbol, note = 'green', '', 'valid'
else:
color, symbol, note, num_files = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, num_files = 'lightyellow', '-', 'disabled', '-'
if folder['path']:
if os.path.exists(folder['path']):
num_files = (
f'{len(os.listdir(folder["path"]))} files'
if os.path.isdir(folder['path']) else
printable_filesize(os.path.getsize(folder['path']))
)
else:
num_files = 'missing'
if ' ' in folder['path']:
folder['path'] = f'"{folder["path"]}"'
return ' '.join((
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(22),
(folder["path"] or '').ljust(76),
num_files.ljust(14),
ANSI[color],
note,
ANSI['reset'],
))
@enforce_types
def printable_dependency_version(name: str, dependency: Dict) -> str:
if dependency['enabled']:
if dependency['is_valid']:
color, symbol, note, version = 'green', '', 'valid', ''
parsed_version_num = re.search(r'[\d\.]+', dependency['version'])
if parsed_version_num:
version = f'v{parsed_version_num[0]}'
if not version:
color, symbol, note, version = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, version = 'lightyellow', '-', 'disabled', '-'
if ' ' in dependency["path"]:
dependency["path"] = f'"{dependency["path"]}"'
return ' '.join((
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(22),
(dependency["path"] or '').ljust(76),
version.ljust(14),
ANSI[color],
note,
ANSI['reset'],
))

View file

@ -119,6 +119,27 @@ DEFAULT_CLI_COLORS = {
} }
ANSI = {k: '' for k in DEFAULT_CLI_COLORS.keys()} ANSI = {k: '' for k in DEFAULT_CLI_COLORS.keys()}
STATICFILE_EXTENSIONS = {
# 99.999% of the time, URLs ending in these extentions are static files
# that can be downloaded as-is, not html pages that need to be rendered
'gif', 'jpeg', 'jpg', 'png', 'tif', 'tiff', 'wbmp', 'ico', 'jng', 'bmp',
'svg', 'svgz', 'webp', 'ps', 'eps', 'ai',
'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v',
'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8',
'pdf', 'txt', 'rtf', 'rtfd', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx',
'atom', 'rss', 'css', 'js', 'json',
'dmg', 'iso', 'img',
'rar', 'war', 'hqx', 'zip', 'gz', 'bz2', '7z',
# Less common extensions to consider adding later
# jar, swf, bin, com, exe, dll, deb
# ear, hqx, eot, wmlc, kml, kmz, cco, jardiff, jnlp, run, msi, msp, msm,
# pl pm, prc pdb, rar, rpm, sea, sit, tcl tk, der, pem, crt, xpi, xspf,
# ra, mng, asx, asf, 3gpp, 3gp, mid, midi, kar, jad, wml, htc, mml
# Thse are always treated as pages, not as static files, never add them:
# html, htm, shtml, xhtml, xml, aspx, php, cgi
}
VERSION_FILENAME = 'VERSION' VERSION_FILENAME = 'VERSION'
PYTHON_DIR_NAME = 'archivebox' PYTHON_DIR_NAME = 'archivebox'

View file

@ -64,3 +64,7 @@ class Page(models.Model):
@property @property
def base_url(self): def base_url(self):
return self.as_link().base_url return self.as_link().base_url
@property
def link_dir(self):
return self.as_link().link_dir

View file

@ -4,17 +4,19 @@ import os
import sys import sys
SECRET_KEY = '---------------- not a valid secret key ! ----------------' SECRET_KEY = '---------------- not a valid secret key ! ----------------'
DEBUG = True DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
ALLOWED_HOSTS = ['*'] ALLOWED_HOSTS = ['*']
REPO_DIR = os.path.abspath(os.path.join(os.path.abspath(__file__), os.path.pardir, os.path.pardir)) REPO_DIR = os.path.abspath(os.path.join(os.path.abspath(__file__), os.path.pardir, os.path.pardir))
OUTPUT_DIR = os.path.abspath(os.getenv('OUTPUT_DIR', os.curdir)) OUTPUT_DIR = os.path.abspath(os.getenv('OUTPUT_DIR', os.curdir))
ARCHIVE_DIR = os.path.join(OUTPUT_DIR, 'archive')
DATABASE_FILE = os.path.join(OUTPUT_DIR, 'index.sqlite3') DATABASE_FILE = os.path.join(OUTPUT_DIR, 'index.sqlite3')
ACTIVE_THEME = 'default' ACTIVE_THEME = 'default'
IS_SHELL = 'shell' in sys.argv[:3] or 'shell_plus' in sys.argv[:3] IS_SHELL = 'shell' in sys.argv[:3] or 'shell_plus' in sys.argv[:3]
APPEND_SLASH = True
INSTALLED_APPS = [ INSTALLED_APPS = [
'django.contrib.auth', 'django.contrib.auth',

View file

@ -1,17 +1,6 @@
from cli import list_subcommands from cli.logging import log_shell_welcome_msg
from .config import ANSI
if __name__ == '__main__': if __name__ == '__main__':
print('{green}# ArchiveBox Imports{reset}'.format(**ANSI)) from main import *
# print('from archivebox.core.models import Page, User') log_shell_welcome_msg()
print('{green}from archivebox.cli import\narchivebox_{}{reset}'.format("\narchivebox_".join(list_subcommands().keys()), **ANSI))
print()
print('[i] Welcome to the ArchiveBox Shell! Example use:')
print(' print(Page.objects.filter(is_archived=True).count())')
print(' Page.objects.get(url="https://example.com").as_json()')
print(' Page.objects.get(url="https://example.com").as_json()')
print(' from archivebox.main import get_invalid_folders')

View file

@ -5,16 +5,11 @@ import os
from typing import Optional, List, Dict, Tuple from typing import Optional, List, Dict, Tuple
from collections import defaultdict from collections import defaultdict
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, DEVNULL, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
DEVNULL,
is_static_file, is_static_file,
ArchiveError,
chmod_file,
) )
from ..config import ( from ..config import (
VERSION, VERSION,
@ -24,6 +19,7 @@ from ..config import (
CURL_VERSION, CURL_VERSION,
CHECK_SSL_VALIDITY CHECK_SSL_VALIDITY
) )
from ..cli.logging import TimedProgress

View file

@ -4,22 +4,19 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
is_static_file, is_static_file,
ArchiveError,
chrome_args, chrome_args,
chmod_file,
) )
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
SAVE_DOM, SAVE_DOM,
CHROME_VERSION, CHROME_VERSION,
) )
from ..cli.logging import TimedProgress

View file

@ -5,14 +5,8 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput
from ..util import ( from ..system import chmod_file, run, PIPE
enforce_types, from ..util import enforce_types, domain
TimedProgress,
domain,
run,
PIPE,
chmod_file,
)
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
SAVE_FAVICON, SAVE_FAVICON,
@ -20,6 +14,7 @@ from ..config import (
CURL_VERSION, CURL_VERSION,
CHECK_SSL_VALIDITY, CHECK_SSL_VALIDITY,
) )
from ..cli.logging import TimedProgress
@enforce_types @enforce_types

View file

@ -4,15 +4,11 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
is_static_file, is_static_file,
ArchiveError,
chmod_file,
domain, domain,
extension, extension,
without_query, without_query,
@ -26,6 +22,7 @@ from ..config import (
GIT_DOMAINS, GIT_DOMAINS,
CHECK_SSL_VALIDITY CHECK_SSL_VALIDITY
) )
from ..cli.logging import TimedProgress

View file

@ -4,15 +4,11 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
is_static_file, is_static_file,
ArchiveError,
chmod_file,
) )
from ..config import ( from ..config import (
MEDIA_TIMEOUT, MEDIA_TIMEOUT,
@ -21,6 +17,7 @@ from ..config import (
YOUTUBEDL_VERSION, YOUTUBEDL_VERSION,
CHECK_SSL_VALIDITY CHECK_SSL_VALIDITY
) )
from ..cli.logging import TimedProgress
@enforce_types @enforce_types

View file

@ -4,23 +4,19 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
is_static_file, is_static_file,
ArchiveError,
chrome_args, chrome_args,
chmod_file,
) )
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
SAVE_PDF, SAVE_PDF,
CHROME_VERSION, CHROME_VERSION,
) )
from ..cli.logging import TimedProgress
@enforce_types @enforce_types

View file

@ -4,22 +4,19 @@ import os
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE, chmod_file
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
run,
PIPE,
is_static_file, is_static_file,
ArchiveError,
chrome_args, chrome_args,
chmod_file,
) )
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
SAVE_SCREENSHOT, SAVE_SCREENSHOT,
CHROME_VERSION, CHROME_VERSION,
) )
from ..cli.logging import TimedProgress

View file

@ -1,14 +1,14 @@
__package__ = 'archivebox.extractors' __package__ = 'archivebox.extractors'
import re
from typing import Optional from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress,
is_static_file, is_static_file,
ArchiveError, download_url,
fetch_page_title, htmldecode,
) )
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
@ -16,6 +16,14 @@ from ..config import (
CURL_BINARY, CURL_BINARY,
CURL_VERSION, CURL_VERSION,
) )
from ..cli.logging import TimedProgress
HTML_TITLE_REGEX = re.compile(
r'<title.*?>' # start matching text after <title> tag
r'(.[^<>]+)', # get everything up to these symbols
re.IGNORECASE | re.MULTILINE | re.DOTALL | re.UNICODE,
)
@enforce_types @enforce_types
@ -44,7 +52,9 @@ def save_title(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
status = 'succeeded' status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ') timer = TimedProgress(timeout, prefix=' ')
try: try:
output = fetch_page_title(link.url, timeout=timeout, progress=False) html = download_url(link.url, timeout=timeout)
match = re.search(HTML_TITLE_REGEX, html)
output = htmldecode(match.group(1).strip()) if match else None
if not output: if not output:
raise ArchiveError('Unable to detect page title') raise ArchiveError('Unable to detect page title')
except Exception as err: except Exception as err:

View file

@ -1,18 +1,22 @@
__package__ = 'archivebox.extractors' __package__ = 'archivebox.extractors'
import os import os
import re
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
from ..index.schema import Link, ArchiveResult, ArchiveOutput from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
from ..system import run, PIPE
from ..util import ( from ..util import (
enforce_types, enforce_types,
TimedProgress, is_static_file,
run, without_scheme,
PIPE, without_fragment,
wget_output_path, without_query,
ArchiveError, path,
domain,
urldecode,
) )
from ..config import ( from ..config import (
TIMEOUT, TIMEOUT,
@ -26,7 +30,7 @@ from ..config import (
WGET_USER_AGENT, WGET_USER_AGENT,
COOKIES_FILE, COOKIES_FILE,
) )
from ..cli.logging import TimedProgress
@enforce_types @enforce_types
@ -121,3 +125,72 @@ def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
status=status, status=status,
**timer.stats, **timer.stats,
) )
@enforce_types
def wget_output_path(link: Link) -> Optional[str]:
"""calculate the path to the wgetted .html file, since wget may
adjust some paths to be different than the base_url path.
See docs on wget --adjust-extension (-E)
"""
if is_static_file(link.url):
return without_scheme(without_fragment(link.url))
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
# > example.com/index.html
# https://example.com?v=zzVa_tX1OiI
# > example.com/index.html?v=zzVa_tX1OiI.html
# https://www.example.com/?v=zzVa_tX1OiI
# > example.com/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc
# > example.com/abc.html
# https://example.com/abc/
# > example.com/abc/index.html
# https://example.com/abc?v=zzVa_tX1OiI.html
# > example.com/abc?v=zzVa_tX1OiI.html
# https://example.com/abc/?v=zzVa_tX1OiI.html
# > example.com/abc/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc/test.html
# > example.com/abc/test.html
# https://example.com/abc/test?v=zzVa_tX1OiI
# > example.com/abc/test?v=zzVa_tX1OiI.html
# https://example.com/abc/test/?v=zzVa_tX1OiI
# > example.com/abc/test/index.html?v=zzVa_tX1OiI.html
# There's also lots of complexity around how the urlencoding and renaming
# is done for pages with query and hash fragments or extensions like shtml / htm / php / etc
# Since the wget algorithm for -E (appending .html) is incredibly complex
# and there's no way to get the computed output path from wget
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
link.link_dir,
domain(link.url),
urldecode(full_path),
)
for _ in range(4):
if os.path.exists(search_dir):
if os.path.isdir(search_dir):
html_files = [
f for f in os.listdir(search_dir)
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
]
if html_files:
path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
if search_dir == link.link_dir:
break
return None

View file

@ -10,12 +10,10 @@ from typing import List, Tuple, Dict, Optional, Iterable
from collections import OrderedDict from collections import OrderedDict
from contextlib import contextmanager from contextlib import contextmanager
from ..parsers import parse_links from ..system import atomic_write
from ..util import ( from ..util import (
scheme, scheme,
enforce_types, enforce_types,
TimedProgress,
atomic_write,
ExtendedEncoder, ExtendedEncoder,
) )
from ..config import ( from ..config import (
@ -30,6 +28,7 @@ from ..config import (
stderr, stderr,
) )
from ..cli.logging import ( from ..cli.logging import (
TimedProgress,
log_indexing_process_started, log_indexing_process_started,
log_indexing_process_finished, log_indexing_process_finished,
log_indexing_started, log_indexing_started,
@ -278,6 +277,8 @@ def import_new_links(existing_links: List[Link],
import_path: str, import_path: str,
out_dir: str=OUTPUT_DIR) -> Tuple[List[Link], List[Link]]: out_dir: str=OUTPUT_DIR) -> Tuple[List[Link], List[Link]]:
from ..parsers import parse_links
new_links: List[Link] = [] new_links: List[Link] = []
# parse and validate the import file # parse and validate the import file
@ -584,9 +585,9 @@ def fix_invalid_folder_locations(out_dir: str=OUTPUT_DIR) -> Tuple[List[str], Li
else: else:
shutil.move(entry.path, dest) shutil.move(entry.path, dest)
fixed.append(dest) fixed.append(dest)
timestamp = entry.path.rsplit('/', 1)[-1]
if link.link_dir != entry.path: assert link.link_dir == entry.path
link = link.overwrite(link_dir=entry.path) assert link.timestamp == timestamp
write_json_link_details(link, out_dir=entry.path) write_json_link_details(link, out_dir=entry.path)
return fixed, cant_fix return fixed, cant_fix

37
archivebox/index/csv.py Normal file
View file

@ -0,0 +1,37 @@
__package__ = 'archivebox.index'
from typing import List, Optional, Any
from ..util import enforce_types
from .schema import Link
@enforce_types
def links_to_csv(links: List[Link],
cols: Optional[List[str]]=None,
header: bool=True,
separator: str=',',
ljust: int=0) -> str:
cols = cols or ['timestamp', 'is_archived', 'url']
header_str = ''
if header:
header_str = separator.join(col.ljust(ljust) for col in cols)
row_strs = (
link.to_csv(cols=cols, ljust=ljust, separator=separator)
for link in links
)
return '\n'.join((header_str, *row_strs))
@enforce_types
def to_csv(obj: Any, cols: List[str], separator: str=',', ljust: int=0) -> str:
from .json import to_json
return separator.join(
to_json(getattr(obj, col), indent=None).ljust(ljust)
for col in cols
)

View file

@ -2,20 +2,18 @@ __package__ = 'archivebox.index'
import os import os
from string import Template
from datetime import datetime from datetime import datetime
from typing import List, Optional, Iterator from typing import List, Optional, Iterator, Mapping
from .schema import Link from .schema import Link
from ..system import atomic_write, copy_and_overwrite
from ..util import ( from ..util import (
enforce_types, enforce_types,
ts_to_date, ts_to_date,
urlencode, urlencode,
htmlencode, htmlencode,
urldecode, urldecode,
wget_output_path,
render_template,
atomic_write,
copy_and_overwrite,
) )
from ..config import ( from ..config import (
OUTPUT_DIR, OUTPUT_DIR,
@ -67,7 +65,7 @@ def write_html_main_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished:
def main_index_template(links: List[Link], finished: bool=True) -> str: def main_index_template(links: List[Link], finished: bool=True) -> str:
"""render the template for the entire main index""" """render the template for the entire main index"""
return render_template(MAIN_INDEX_TEMPLATE, { return render_legacy_template(MAIN_INDEX_TEMPLATE, {
'version': VERSION, 'version': VERSION,
'git_sha': GIT_SHA, 'git_sha': GIT_SHA,
'num_links': str(len(links)), 'num_links': str(len(links)),
@ -86,7 +84,9 @@ def main_index_template(links: List[Link], finished: bool=True) -> str:
def main_index_row_template(link: Link) -> str: def main_index_row_template(link: Link) -> str:
"""render the template for an individual link row of the main index""" """render the template for an individual link row of the main index"""
return render_template(MAIN_INDEX_ROW_TEMPLATE, { from ..extractors.wget import wget_output_path
return render_legacy_template(MAIN_INDEX_ROW_TEMPLATE, {
**link._asdict(extended=True), **link._asdict(extended=True),
# before pages are finished archiving, show loading msg instead of title # before pages are finished archiving, show loading msg instead of title
@ -122,9 +122,11 @@ def write_html_link_details(link: Link, out_dir: Optional[str]=None) -> None:
@enforce_types @enforce_types
def link_details_template(link: Link) -> str: def link_details_template(link: Link) -> str:
from ..extractors.wget import wget_output_path
link_info = link._asdict(extended=True) link_info = link._asdict(extended=True)
return render_template(LINK_DETAILS_TEMPLATE, { return render_legacy_template(LINK_DETAILS_TEMPLATE, {
**link_info, **link_info,
**link_info['canonical'], **link_info['canonical'],
'title': ( 'title': (
@ -142,3 +144,13 @@ def link_details_template(link: Link) -> str:
'status_color': 'success' if link.is_archived else 'danger', 'status_color': 'success' if link.is_archived else 'danger',
'oldest_archive_date': ts_to_date(link.oldest_archive_date), 'oldest_archive_date': ts_to_date(link.oldest_archive_date),
}) })
@enforce_types
def render_legacy_template(template_path: str, context: Mapping[str, str]) -> str:
"""render a given html template string with the given template content"""
# will be replaced by django templates in the future
with open(template_path, 'r', encoding='utf-8') as template:
template_str = template.read()
return Template(template_str).substitute(**context)

View file

@ -2,13 +2,14 @@ __package__ = 'archivebox.index'
import os import os
import sys import sys
import json import json as pyjson
from datetime import datetime from datetime import datetime
from typing import List, Optional, Iterator from typing import List, Optional, Iterator, Any
from .schema import Link, ArchiveResult from .schema import Link, ArchiveResult
from ..util import enforce_types, atomic_write from ..system import atomic_write
from ..util import enforce_types
from ..config import ( from ..config import (
VERSION, VERSION,
OUTPUT_DIR, OUTPUT_DIR,
@ -46,7 +47,7 @@ def parse_json_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
index_path = os.path.join(out_dir, JSON_INDEX_FILENAME) index_path = os.path.join(out_dir, JSON_INDEX_FILENAME)
if os.path.exists(index_path): if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f: with open(index_path, 'r', encoding='utf-8') as f:
links = json.load(f)['links'] links = pyjson.load(f)['links']
for link_json in links: for link_json in links:
yield Link.from_json(link_json) yield Link.from_json(link_json)
@ -95,12 +96,13 @@ def parse_json_link_details(out_dir: str) -> Optional[Link]:
if os.path.exists(existing_index): if os.path.exists(existing_index):
with open(existing_index, 'r', encoding='utf-8') as f: with open(existing_index, 'r', encoding='utf-8') as f:
try: try:
link_json = json.load(f) link_json = pyjson.load(f)
return Link.from_json(link_json) return Link.from_json(link_json)
except json.JSONDecodeError: except pyjson.JSONDecodeError:
pass pass
return None return None
@enforce_types @enforce_types
def parse_json_links_details(out_dir: str) -> Iterator[Link]: def parse_json_links_details(out_dir: str) -> Iterator[Link]:
"""read through all the archive data folders and return the parsed links""" """read through all the archive data folders and return the parsed links"""
@ -111,3 +113,41 @@ def parse_json_links_details(out_dir: str) -> Iterator[Link]:
link = parse_json_link_details(entry.path) link = parse_json_link_details(entry.path)
if link: if link:
yield link yield link
### Helpers
class ExtendedEncoder(pyjson.JSONEncoder):
"""
Extended json serializer that supports serializing several model
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj)
return pyjson.JSONEncoder.default(self, obj)
@enforce_types
def to_json(obj: Any, indent: Optional[int]=4, sort_keys: bool=True, cls=ExtendedEncoder) -> str:
return pyjson.dumps(obj, indent=indent, sort_keys=sort_keys, cls=ExtendedEncoder)

View file

@ -61,19 +61,20 @@ class ArchiveResult:
info['end_ts'] = parse_date(info['end_ts']) info['end_ts'] = parse_date(info['end_ts'])
return cls(**info) return cls(**info)
def to_json(self, indent=4, sort_keys=True): def to_dict(self, *keys) -> dict:
from ..util import to_json if keys:
return {k: v for k, v in asdict(self).items() if k in keys}
return asdict(self)
def to_json(self, indent=4, sort_keys=True) -> str:
from .json import to_json
return to_json(self, indent=indent, sort_keys=sort_keys) return to_json(self, indent=indent, sort_keys=sort_keys)
def to_csv(self, cols=None, ljust: int=0, separator: str=','): def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
from ..util import to_json from .csv import to_csv
cols = cols or self.field_names() return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust)
return separator.join(
to_json(getattr(self, col), indent=None).ljust(ljust)
for col in cols
)
@classmethod @classmethod
def field_names(cls): def field_names(cls):
@ -201,18 +202,15 @@ class Link:
info['history'] = cast_history info['history'] = cast_history
return cls(**info) return cls(**info)
def to_json(self, indent=4, sort_keys=True): def to_json(self, indent=4, sort_keys=True) -> str:
from ..util import to_json from .json import to_json
return to_json(self, indent=indent, sort_keys=sort_keys) return to_json(self, indent=indent, sort_keys=sort_keys)
def to_csv(self, csv_cols: List[str], ljust: int=0, separator: str=','): def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
from ..util import to_json from .csv import to_csv
return separator.join( return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
to_json(getattr(self, col), indent=None).ljust(ljust)
for col in csv_cols
)
@classmethod @classmethod
def field_names(cls): def field_names(cls):
@ -354,7 +352,7 @@ class Link:
def canonical_outputs(self) -> Dict[str, Optional[str]]: def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""predict the expected output paths that should be present after archiving""" """predict the expected output paths that should be present after archiving"""
from ..util import wget_output_path from ..extractors.wget import wget_output_path
canonical = { canonical = {
'index_path': 'index.html', 'index_path': 'index.html',
'favicon_path': 'favicon.ico', 'favicon_path': 'favicon.ico',
@ -382,3 +380,5 @@ class Link:
'dom_path': static_path, 'dom_path': static_path,
}) })
return canonical return canonical

View file

@ -1,11 +1,10 @@
__package__ = 'archivebox' __package__ = 'archivebox'
import re
import os import os
import sys import sys
import shutil import shutil
from typing import Dict, List, Optional, Set, Tuple, Iterable, IO from typing import Dict, List, Optional, Iterable, IO
from crontab import CronTab, CronSlices from crontab import CronTab, CronSlices
@ -17,18 +16,13 @@ from .cli import (
main_cmds, main_cmds,
archive_cmds, archive_cmds,
) )
from .index.schema import Link from .parsers import (
from .util import (
enforce_types,
TimedProgress,
get_dir_size,
human_readable_size,
save_stdin_to_sources, save_stdin_to_sources,
save_file_to_sources, save_file_to_sources,
links_to_csv,
to_json,
folders_to_str,
) )
from .index.schema import Link
from .util import enforce_types, docstring
from .system import get_dir_size, dedupe_cron_jobs, CRON_COMMENT
from .index import ( from .index import (
links_after_timestamp, links_after_timestamp,
load_main_index, load_main_index,
@ -51,7 +45,11 @@ from .index.json import (
parse_json_main_index, parse_json_main_index,
parse_json_links_details, parse_json_links_details,
) )
from .index.sql import parse_sql_main_index, get_admins, apply_migrations from .index.sql import (
parse_sql_main_index,
get_admins,
apply_migrations,
)
from .index.html import parse_html_main_index from .index.html import parse_html_main_index
from .extractors import archive_link from .extractors import archive_link
from .config import ( from .config import (
@ -91,6 +89,7 @@ from .config import (
get_real_name, get_real_name,
) )
from .cli.logging import ( from .cli.logging import (
TimedProgress,
log_archiving_started, log_archiving_started,
log_archiving_paused, log_archiving_paused,
log_archiving_finished, log_archiving_finished,
@ -98,6 +97,11 @@ from .cli.logging import (
log_removal_finished, log_removal_finished,
log_list_started, log_list_started,
log_list_finished, log_list_finished,
printable_config,
printable_folders,
printable_filesize,
printable_folder_status,
printable_dependency_version,
) )
@ -387,7 +391,7 @@ def info(out_dir: str=OUTPUT_DIR) -> None:
print('{green}[*] Scanning archive collection main index...{reset}'.format(**ANSI)) print('{green}[*] Scanning archive collection main index...{reset}'.format(**ANSI))
print(f' {out_dir}/*') print(f' {out_dir}/*')
num_bytes, num_dirs, num_files = get_dir_size(out_dir, recursive=False, pattern='index.') num_bytes, num_dirs, num_files = get_dir_size(out_dir, recursive=False, pattern='index.')
size = human_readable_size(num_bytes) size = printable_filesize(num_bytes)
print(f' Size: {size} across {num_files} files') print(f' Size: {size} across {num_files} files')
print() print()
@ -419,7 +423,7 @@ def info(out_dir: str=OUTPUT_DIR) -> None:
print(f' {ARCHIVE_DIR}/*') print(f' {ARCHIVE_DIR}/*')
num_bytes, num_dirs, num_files = get_dir_size(ARCHIVE_DIR) num_bytes, num_dirs, num_files = get_dir_size(ARCHIVE_DIR)
size = human_readable_size(num_bytes) size = printable_filesize(num_bytes)
print(f' Size: {size} across {num_files} files in {num_dirs} directories') print(f' Size: {size} across {num_files} files in {num_dirs} directories')
print() print()
@ -712,13 +716,8 @@ def list_all(filter_patterns_str: Optional[str]=None,
out_dir=out_dir, out_dir=out_dir,
) )
if csv: print(printable_folders(folders, json=json, csv=csv))
print(links_to_csv(folders.values(), csv_cols=csv.split(','), header=True)) return folders
elif json:
print(to_json(folders.values(), indent=4, sort_keys=True))
else:
print(folders_to_str(folders))
raise SystemExit(not folders)
@enforce_types @enforce_types
@ -749,7 +748,7 @@ def list_folders(links: List[Link],
status: str, status: str,
out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]: out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
check_data_folder() check_data_folder(out_dir=out_dir)
if status == 'indexed': if status == 'indexed':
return get_indexed_folders(links, out_dir=out_dir) return get_indexed_folders(links, out_dir=out_dir)
@ -796,7 +795,7 @@ def config(config_options_str: Optional[str]=None,
) )
raise SystemExit(2) raise SystemExit(2)
elif config_options_str: elif config_options_str:
config_options = stdin_raw_text.split('\n') config_options = config_options_str.split('\n')
config_options = config_options or [] config_options = config_options or []
@ -865,7 +864,6 @@ def config(config_options_str: Optional[str]=None,
stderr(' Please manually remove the relevant lines from your config file:') stderr(' Please manually remove the relevant lines from your config file:')
stderr(f' {CONFIG_FILE}') stderr(f' {CONFIG_FILE}')
raise SystemExit(2) raise SystemExit(2)
else: else:
stderr('[X] You must pass either --get or --set, or no arguments to get the whole config.', color='red') stderr('[X] You must pass either --get or --set, or no arguments to get the whole config.', color='red')
stderr(' archivebox config') stderr(' archivebox config')
@ -874,8 +872,6 @@ def config(config_options_str: Optional[str]=None,
raise SystemExit(2) raise SystemExit(2)
CRON_COMMENT = 'archivebox_schedule'
@enforce_types @enforce_types
def schedule(add: bool=False, def schedule(add: bool=False,
show: bool=False, show: bool=False,
@ -893,7 +889,7 @@ def schedule(add: bool=False,
os.makedirs(os.path.join(out_dir, LOGS_DIR_NAME), exist_ok=True) os.makedirs(os.path.join(out_dir, LOGS_DIR_NAME), exist_ok=True)
cron = CronTab(user=True) cron = CronTab(user=True)
cron = dedupe_jobs(cron) cron = dedupe_cron_jobs(cron)
existing_jobs = list(cron.find_comment(CRON_COMMENT)) existing_jobs = list(cron.find_comment(CRON_COMMENT))
if foreground or run_all: if foreground or run_all:
@ -962,7 +958,7 @@ def schedule(add: bool=False,
stderr(' archivebox init --every="0/5 * * * *" https://example.com/some/rss/feed.xml') stderr(' archivebox init --every="0/5 * * * *" https://example.com/some/rss/feed.xml')
raise SystemExit(1) raise SystemExit(1)
cron = dedupe_jobs(cron) cron = dedupe_cron_jobs(cron)
cron.write() cron.write()
total_runs = sum(j.frequency_per_year() for j in cron) total_runs = sum(j.frequency_per_year() for j in cron)
@ -1025,95 +1021,13 @@ def manage(args: Optional[List[str]]=None, out_dir: str=OUTPUT_DIR) -> None:
execute_from_command_line([f'{ARCHIVEBOX_BINARY} manage', *(args or ['help'])]) execute_from_command_line([f'{ARCHIVEBOX_BINARY} manage', *(args or ['help'])])
@enforce_types
def shell(out_dir: str=OUTPUT_DIR) -> None: def shell(out_dir: str=OUTPUT_DIR) -> None:
"""Enter an interactive ArchiveBox Django shell"""
check_data_folder(out_dir=out_dir) check_data_folder(out_dir=out_dir)
setup_django(OUTPUT_DIR) setup_django(OUTPUT_DIR)
from django.core.management import call_command from django.core.management import call_command
call_command("shell_plus") call_command("shell_plus")
# Helpers
def printable_config(config: ConfigDict, prefix: str='') -> str:
return f'\n{prefix}'.join(
f'{key}={val}'
for key, val in config.items()
if not (isinstance(val, dict) or callable(val))
)
def dedupe_jobs(cron: CronTab) -> CronTab:
deduped: Set[Tuple[str, str]] = set()
for job in list(cron):
unique_tuple = (str(job.slices), job.command)
if unique_tuple not in deduped:
deduped.add(unique_tuple)
cron.remove(job)
for schedule, command in deduped:
job = cron.new(command=command, comment=CRON_COMMENT)
job.setall(schedule)
job.enable()
return cron
def print_folder_status(name, folder):
if folder['enabled']:
if folder['is_valid']:
color, symbol, note = 'green', '', 'valid'
else:
color, symbol, note, num_files = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, num_files = 'lightyellow', '-', 'disabled', '-'
if folder['path']:
if os.path.exists(folder['path']):
num_files = (
f'{len(os.listdir(folder["path"]))} files'
if os.path.isdir(folder['path']) else
human_readable_size(os.path.getsize(folder['path']))
)
else:
num_files = 'missing'
if ' ' in folder['path']:
folder['path'] = f'"{folder["path"]}"'
print(
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(22),
(folder["path"] or '').ljust(76),
num_files.ljust(14),
ANSI[color],
note,
ANSI['reset'],
)
def print_dependency_version(name, dependency):
if dependency['enabled']:
if dependency['is_valid']:
color, symbol, note = 'green', '', 'valid'
version = 'v' + re.search(r'[\d\.]+', dependency['version'])[0]
else:
color, symbol, note, version = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, version = 'lightyellow', '-', 'disabled', '-'
if ' ' in dependency["path"]:
dependency["path"] = f'"{dependency["path"]}"'
print(
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(22),
(dependency["path"] or '').ljust(76),
version.ljust(14),
ANSI[color],
note,
ANSI['reset'],
)

View file

@ -7,16 +7,29 @@ For examples of supported import formats see tests/.
__package__ = 'archivebox.parsers' __package__ = 'archivebox.parsers'
import re
import os
from typing import Tuple, List from typing import Tuple, List
from datetime import datetime
from ..config import TIMEOUT from ..index.schema import Link
from ..util import ( from ..system import atomic_write
check_url_parsing_invariants, from ..config import (
TimedProgress, ANSI,
Link, OUTPUT_DIR,
enforce_types, SOURCES_DIR_NAME,
TIMEOUT,
check_data_folder,
) )
from ..util import (
basename,
domain,
download_url,
enforce_types,
URL_REGEX,
)
from ..cli.logging import pretty_path, TimedProgress
from .pocket_html import parse_pocket_html_export from .pocket_html import parse_pocket_html_export
from .pinboard_rss import parse_pinboard_rss_export from .pinboard_rss import parse_pinboard_rss_export
from .shaarli_rss import parse_shaarli_rss_export from .shaarli_rss import parse_shaarli_rss_export
@ -66,3 +79,95 @@ def parse_links(source_file: str) -> Tuple[List[Link], str]:
timer.end() timer.end()
return [], 'Failed to parse' return [], 'Failed to parse'
@enforce_types
def save_stdin_to_sources(raw_text: str, out_dir: str=OUTPUT_DIR) -> str:
check_data_folder(out_dir=out_dir)
sources_dir = os.path.join(out_dir, SOURCES_DIR_NAME)
if not os.path.exists(sources_dir):
os.makedirs(sources_dir)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(sources_dir, '{}-{}.txt'.format('stdin', ts))
atomic_write(raw_text, source_path)
return source_path
@enforce_types
def save_file_to_sources(path: str, timeout: int=TIMEOUT, out_dir: str=OUTPUT_DIR) -> str:
"""download a given url's content into output/sources/domain-<timestamp>.txt"""
check_data_folder(out_dir=out_dir)
sources_dir = os.path.join(out_dir, SOURCES_DIR_NAME)
if not os.path.exists(sources_dir):
os.makedirs(sources_dir)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(sources_dir, '{}-{}.txt'.format(basename(path), ts))
if any(path.startswith(s) for s in ('http://', 'https://', 'ftp://')):
source_path = os.path.join(sources_dir, '{}-{}.txt'.format(domain(path), ts))
print('{}[*] [{}] Downloading {}{}'.format(
ANSI['green'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
path,
ANSI['reset'],
))
timer = TimedProgress(timeout, prefix=' ')
try:
raw_source_text = download_url(path, timeout=timeout)
timer.end()
except Exception as e:
timer.end()
print('{}[!] Failed to download {}{}\n'.format(
ANSI['red'],
path,
ANSI['reset'],
))
print(' ', e)
raise SystemExit(1)
else:
with open(path, 'r') as f:
raw_source_text = f.read()
atomic_write(raw_source_text, source_path)
print(' > {}'.format(pretty_path(source_path)))
return source_path
def check_url_parsing_invariants() -> None:
"""Check that plain text regex URL parsing works as expected"""
# this is last-line-of-defense to make sure the URL_REGEX isn't
# misbehaving, as the consequences could be disastrous and lead to many
# incorrect/badly parsed links being added to the archive
test_urls = '''
https://example1.com/what/is/happening.html?what=1#how-about-this=1
https://example2.com/what/is/happening/?what=1#how-about-this=1
HTtpS://example3.com/what/is/happening/?what=1#how-about-this=1f
https://example4.com/what/is/happening.html
https://example5.com/
https://example6.com
<test>http://example7.com</test>
[https://example8.com/what/is/this.php?what=1]
[and http://example9.com?what=1&other=3#and-thing=2]
<what>https://example10.com#and-thing=2 "</about>
abc<this["https://example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def
sdflkf[what](https://example12.com/who/what.php?whoami=1#whatami=2)?am=hi
example13.bada
and example14.badb
<or>htt://example15.badc</that>
'''
# print('\n'.join(re.findall(URL_REGEX, test_urls)))
assert len(re.findall(URL_REGEX, test_urls)) == 12

150
archivebox/system.py Normal file
View file

@ -0,0 +1,150 @@
__package__ = 'archivebox'
import os
import shutil
import json as pyjson
from typing import Optional, Union, Set, Tuple
from crontab import CronTab
from subprocess import (
Popen,
PIPE,
DEVNULL,
CompletedProcess,
TimeoutExpired,
CalledProcessError,
)
from .util import enforce_types, ExtendedEncoder
from .config import OUTPUT_PERMISSIONS
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
"""Patched of subprocess.run to fix blocking io making timeout=innefective"""
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('stdout and stderr arguments may not be used '
'with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
try:
stdout, stderr = process.communicate(input, timeout=2)
except:
pass
raise TimeoutExpired(popenargs[0][0], timeout)
except BaseException:
process.kill()
# We don't call process.wait() as .__exit__ does that for us.
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
def atomic_write(contents: Union[dict, str, bytes], path: str) -> None:
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
try:
tmp_file = '{}.tmp'.format(path)
if isinstance(contents, bytes):
args = {'mode': 'wb+'}
else:
args = {'mode': 'w+', 'encoding': 'utf-8'}
with open(tmp_file, **args) as f:
if isinstance(contents, dict):
pyjson.dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
else:
f.write(contents)
os.fsync(f.fileno())
os.rename(tmp_file, path)
chmod_file(path)
finally:
if os.path.exists(tmp_file):
os.remove(tmp_file)
@enforce_types
def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None:
"""chmod -R <permissions> <cwd>/<path>"""
if not os.path.exists(os.path.join(cwd, path)):
raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
chmod_result = run(['chmod', '-R', permissions, path], cwd=cwd, stdout=DEVNULL, stderr=PIPE, timeout=timeout)
if chmod_result.returncode == 1:
print(' ', chmod_result.stderr.decode())
raise Exception('Failed to chmod {}/{}'.format(cwd, path))
@enforce_types
def copy_and_overwrite(from_path: str, to_path: str):
"""copy a given file or directory to a given path, overwriting the destination"""
if os.path.isdir(from_path):
shutil.rmtree(to_path, ignore_errors=True)
shutil.copytree(from_path, to_path)
else:
with open(from_path, 'rb') as src:
atomic_write(src.read(), to_path)
@enforce_types
def get_dir_size(path: str, recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
"""get the total disk size of a given directory, optionally summing up
recursively and limiting to a given filter list
"""
num_bytes, num_dirs, num_files = 0, 0, 0
for entry in os.scandir(path):
if (pattern is not None) and (pattern not in entry.path):
continue
if entry.is_dir(follow_symlinks=False):
if not recursive:
continue
num_dirs += 1
bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
num_bytes += bytes_inside
num_dirs += dirs_inside
num_files += files_inside
else:
num_bytes += entry.stat(follow_symlinks=False).st_size
num_files += 1
return num_bytes, num_dirs, num_files
CRON_COMMENT = 'archivebox_schedule'
@enforce_types
def dedupe_cron_jobs(cron: CronTab) -> CronTab:
deduped: Set[Tuple[str, str]] = set()
for job in list(cron):
unique_tuple = (str(job.slices), job.command)
if unique_tuple not in deduped:
deduped.add(unique_tuple)
cron.remove(job)
for schedule, command in deduped:
job = cron.new(command=command, comment=CRON_COMMENT)
job.setall(schedule)
job.enable()
return cron

View file

@ -1,15 +1,8 @@
import os
import re import re
import sys
import ssl import ssl
import json
import time
import shutil
import argparse
from string import Template
from json import JSONEncoder from typing import List, Optional, Any
from typing import List, Dict, Optional, Any, Union, IO, Mapping, Tuple
from inspect import signature from inspect import signature
from functools import wraps from functools import wraps
from hashlib import sha256 from hashlib import sha256
@ -17,34 +10,17 @@ from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote, unquote from urllib.parse import urlparse, quote, unquote
from html import escape, unescape from html import escape, unescape
from datetime import datetime from datetime import datetime
from multiprocessing import Process
from subprocess import (
Popen,
PIPE,
DEVNULL,
CompletedProcess,
TimeoutExpired,
CalledProcessError,
)
from base32_crockford import encode as base32_encode # type: ignore from base32_crockford import encode as base32_encode # type: ignore
import json as pyjson
from .index.schema import Link
from .config import ( from .config import (
ANSI,
TERM_WIDTH,
OUTPUT_DIR,
SOURCES_DIR_NAME,
OUTPUT_PERMISSIONS,
TIMEOUT, TIMEOUT,
SHOW_PROGRESS, STATICFILE_EXTENSIONS,
SAVE_TITLE,
CHECK_SSL_VALIDITY, CHECK_SSL_VALIDITY,
WGET_USER_AGENT, WGET_USER_AGENT,
CHROME_OPTIONS, CHROME_OPTIONS,
check_data_folder,
) )
from .cli.logging import pretty_path
### Parsing Helpers ### Parsing Helpers
@ -66,6 +42,7 @@ base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
without_www = lambda url: url.replace('://www.', '://', 1) without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?') without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20] hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
is_static_file = lambda url: extension(url).lower() in STATICFILE_EXTENSIONS # TODO: the proper way is with MIME type detection, not using extension
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace') urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: s and unquote(s) urldecode = lambda s: s and unquote(s)
@ -85,36 +62,7 @@ URL_REGEX = re.compile(
r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols
re.IGNORECASE, re.IGNORECASE,
) )
HTML_TITLE_REGEX = re.compile(
r'<title.*?>' # start matching text after <title> tag
r'(.[^<>]+)', # get everything up to these symbols
re.IGNORECASE | re.MULTILINE | re.DOTALL | re.UNICODE,
)
STATICFILE_EXTENSIONS = {
# 99.999% of the time, URLs ending in these extentions are static files
# that can be downloaded as-is, not html pages that need to be rendered
'gif', 'jpeg', 'jpg', 'png', 'tif', 'tiff', 'wbmp', 'ico', 'jng', 'bmp',
'svg', 'svgz', 'webp', 'ps', 'eps', 'ai',
'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v',
'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8',
'pdf', 'txt', 'rtf', 'rtfd', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx',
'atom', 'rss', 'css', 'js', 'json',
'dmg', 'iso', 'img',
'rar', 'war', 'hqx', 'zip', 'gz', 'bz2', '7z',
# Less common extensions to consider adding later
# jar, swf, bin, com, exe, dll, deb
# ear, hqx, eot, wmlc, kml, kmz, cco, jardiff, jnlp, run, msi, msp, msm,
# pl pm, prc pdb, rar, rpm, sea, sit, tcl tk, der, pem, crt, xpi, xspf,
# ra, mng, asx, asf, 3gpp, 3gp, mid, midi, kar, jad, wml, htc, mml
# Thse are always treated as pages, not as static files, never add them:
# html, htm, shtml, xhtml, xml, aspx, php, cgi
}
### Checks & Tests
def enforce_types(func): def enforce_types(func):
""" """
@ -158,189 +106,14 @@ def enforce_types(func):
return typechecked_function return typechecked_function
def check_url_parsing_invariants() -> None: def docstring(text: Optional[str]):
"""Check that plain text regex URL parsing works as expected""" """attach the given docstring to the decorated function"""
def decorator(func):
if text:
func.__doc__ = text
return func
return decorator
# this is last-line-of-defense to make sure the URL_REGEX isn't
# misbehaving, as the consequences could be disastrous and lead to many
# incorrect/badly parsed links being added to the archive
test_urls = '''
https://example1.com/what/is/happening.html?what=1#how-about-this=1
https://example2.com/what/is/happening/?what=1#how-about-this=1
HTtpS://example3.com/what/is/happening/?what=1#how-about-this=1f
https://example4.com/what/is/happening.html
https://example5.com/
https://example6.com
<test>http://example7.com</test>
[https://example8.com/what/is/this.php?what=1]
[and http://example9.com?what=1&other=3#and-thing=2]
<what>https://example10.com#and-thing=2 "</about>
abc<this["https://example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def
sdflkf[what](https://example12.com/who/what.php?whoami=1#whatami=2)?am=hi
example13.bada
and example14.badb
<or>htt://example15.badc</that>
'''
# print('\n'.join(re.findall(URL_REGEX, test_urls)))
assert len(re.findall(URL_REGEX, test_urls)) == 12
### Random Helpers
@enforce_types
def save_stdin_to_sources(raw_text: str, out_dir: str=OUTPUT_DIR) -> str:
check_data_folder(out_dir=out_dir)
sources_dir = os.path.join(out_dir, SOURCES_DIR_NAME)
if not os.path.exists(sources_dir):
os.makedirs(sources_dir)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(sources_dir, '{}-{}.txt'.format('stdin', ts))
atomic_write(raw_text, source_path)
return source_path
@enforce_types
def save_file_to_sources(path: str, timeout: int=TIMEOUT, out_dir: str=OUTPUT_DIR) -> str:
"""download a given url's content into output/sources/domain-<timestamp>.txt"""
check_data_folder(out_dir=out_dir)
sources_dir = os.path.join(out_dir, SOURCES_DIR_NAME)
if not os.path.exists(sources_dir):
os.makedirs(sources_dir)
ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(sources_dir, '{}-{}.txt'.format(basename(path), ts))
if any(path.startswith(s) for s in ('http://', 'https://', 'ftp://')):
source_path = os.path.join(sources_dir, '{}-{}.txt'.format(domain(path), ts))
print('{}[*] [{}] Downloading {}{}'.format(
ANSI['green'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
path,
ANSI['reset'],
))
timer = TimedProgress(timeout, prefix=' ')
try:
raw_source_text = download_url(path, timeout=timeout)
timer.end()
except Exception as e:
timer.end()
print('{}[!] Failed to download {}{}\n'.format(
ANSI['red'],
path,
ANSI['reset'],
))
print(' ', e)
raise SystemExit(1)
else:
with open(path, 'r') as f:
raw_source_text = f.read()
atomic_write(raw_source_text, source_path)
print(' > {}'.format(pretty_path(source_path)))
return source_path
@enforce_types
def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) -> Optional[str]:
"""Attempt to guess a page's title by downloading the html"""
if not SAVE_TITLE:
return None
try:
html = download_url(url, timeout=timeout)
match = re.search(HTML_TITLE_REGEX, html)
return htmldecode(match.group(1).strip()) if match else None
except Exception as err: # noqa
# print('[!] Failed to fetch title because of {}: {}'.format(
# err.__class__.__name__,
# err,
# ))
return None
@enforce_types
def wget_output_path(link: Link) -> Optional[str]:
"""calculate the path to the wgetted .html file, since wget may
adjust some paths to be different than the base_url path.
See docs on wget --adjust-extension (-E)
"""
if is_static_file(link.url):
return without_scheme(without_fragment(link.url))
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
# > output/archive/<timestamp>/example.com/index.html
# https://example.com?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/index.html?v=zzVa_tX1OiI.html
# https://www.example.com/?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc
# > output/archive/<timestamp>/example.com/abc.html
# https://example.com/abc/
# > output/archive/<timestamp>/example.com/abc/index.html
# https://example.com/abc?v=zzVa_tX1OiI.html
# > output/archive/<timestamp>/example.com/abc?v=zzVa_tX1OiI.html
# https://example.com/abc/?v=zzVa_tX1OiI.html
# > output/archive/<timestamp>/example.com/abc/index.html?v=zzVa_tX1OiI.html
# https://example.com/abc/test.html
# > output/archive/<timestamp>/example.com/abc/test.html
# https://example.com/abc/test?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/abc/test?v=zzVa_tX1OiI.html
# https://example.com/abc/test/?v=zzVa_tX1OiI
# > output/archive/<timestamp>/example.com/abc/test/index.html?v=zzVa_tX1OiI.html
# There's also lots of complexity around how the urlencoding and renaming
# is done for pages with query and hash fragments or extensions like shtml / htm / php / etc
# Since the wget algorithm for -E (appending .html) is incredibly complex
# and there's no way to get the computed output path from wget
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
link.link_dir,
domain(link.url),
urldecode(full_path),
)
for _ in range(4):
if os.path.exists(search_dir):
if os.path.isdir(search_dir):
html_files = [
f for f in os.listdir(search_dir)
if re.search(".+\\.[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
]
if html_files:
path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
if search_dir == link.link_dir:
break
return None
### String Manipulation & Logging Helpers
@enforce_types @enforce_types
def str_between(string: str, start: str, end: str=None) -> str: def str_between(string: str, start: str, end: str=None) -> str:
@ -415,122 +188,6 @@ def parse_date(date: Any) -> Optional[datetime]:
raise ValueError('Tried to parse invalid date! {}'.format(date)) raise ValueError('Tried to parse invalid date! {}'.format(date))
@enforce_types
def is_static_file(url: str) -> bool:
"""Certain URLs just point to a single static file, and
don't need to be re-archived in many formats
"""
# TODO: the proper way is with MIME type detection, not using extension
return extension(url) in STATICFILE_EXTENSIONS
### Python / System Helpers
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
"""Patched of subprocess.run to fix blocking io making timeout=innefective"""
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('stdout and stderr arguments may not be used '
'with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
try:
stdout, stderr = process.communicate(input, timeout=2)
except:
pass
raise TimeoutExpired(popenargs[0][0], timeout)
except BaseException:
process.kill()
# We don't call process.wait() as .__exit__ does that for us.
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
class TimedProgress:
"""Show a progress bar and measure elapsed time until .end() is called"""
def __init__(self, seconds, prefix=''):
if SHOW_PROGRESS:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {'start_ts': datetime.now(), 'end_ts': None}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats['end_ts'] = end_ts
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):
# return
if self.p is not None:
self.p.terminate()
self.p = None
sys.stdout.write('\r{}{}\r'.format((' ' * TERM_WIDTH()), ANSI['reset'])) # clear whole terminal line
@enforce_types
def progress_bar(seconds: int, prefix: str='') -> None:
"""show timer in the form of progress bar, with percentage and seconds remaining"""
chunk = '' if sys.stdout.encoding == 'UTF-8' else '#'
chunks = TERM_WIDTH() - len(prefix) - 20 # number of progress chunks to show (aka max bar width)
try:
for s in range(seconds * chunks):
chunks = TERM_WIDTH() - len(prefix) - 20
progress = s / chunks / seconds * 100
bar_width = round(progress/(100/chunks))
# ████████████████████ 0.9% (1/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)'.format(
prefix,
ANSI['green'],
(chunk * bar_width).ljust(chunks),
ANSI['reset'],
round(progress, 1),
round(s/chunks),
seconds,
))
sys.stdout.flush()
time.sleep(1 / chunks)
# ██████████████████████████████████ 100.0% (60/60sec)
sys.stdout.write('\r{0}{1}{2}{3} {4}% ({5}/{6}sec)\n'.format(
prefix,
ANSI['red'],
chunk * chunks,
ANSI['reset'],
100.0,
seconds,
seconds,
))
sys.stdout.flush()
except KeyboardInterrupt:
print()
pass
@enforce_types @enforce_types
def download_url(url: str, timeout: int=TIMEOUT) -> str: def download_url(url: str, timeout: int=TIMEOUT) -> str:
"""Download the contents of a remote url and return the text""" """Download the contents of a remote url and return the text"""
@ -547,58 +204,6 @@ def download_url(url: str, timeout: int=TIMEOUT) -> str:
return resp.read().decode(encoding) return resp.read().decode(encoding)
@enforce_types
def chmod_file(path: str, cwd: str='.', permissions: str=OUTPUT_PERMISSIONS, timeout: int=30) -> None:
"""chmod -R <permissions> <cwd>/<path>"""
if not os.path.exists(os.path.join(cwd, path)):
raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
chmod_result = run(['chmod', '-R', permissions, path], cwd=cwd, stdout=DEVNULL, stderr=PIPE, timeout=timeout)
if chmod_result.returncode == 1:
print(' ', chmod_result.stderr.decode())
raise Exception('Failed to chmod {}/{}'.format(cwd, path))
@enforce_types
def copy_and_overwrite(from_path: str, to_path: str):
if os.path.isdir(from_path):
shutil.rmtree(to_path, ignore_errors=True)
shutil.copytree(from_path, to_path)
else:
with open(from_path, 'rb') as src:
atomic_write(src.read(), to_path)
@enforce_types
def get_dir_size(path: str, recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
num_bytes, num_dirs, num_files = 0, 0, 0
for entry in os.scandir(path):
if (pattern is not None) and (pattern not in entry.path):
continue
if entry.is_dir(follow_symlinks=False):
if not recursive:
continue
num_dirs += 1
bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
num_bytes += bytes_inside
num_dirs += dirs_inside
num_files += files_inside
else:
num_bytes += entry.stat(follow_symlinks=False).st_size
num_files += 1
return num_bytes, num_dirs, num_files
@enforce_types
def human_readable_size(num_bytes: Union[int, float]) -> str:
for count in ['Bytes','KB','MB','GB']:
if num_bytes > -1024.0 and num_bytes < 1024.0:
return '%3.1f %s' % (num_bytes, count)
num_bytes /= 1024.0
return '%3.1f %s' % (num_bytes, 'TB')
@enforce_types @enforce_types
def chrome_args(**options) -> List[str]: def chrome_args(**options) -> List[str]:
"""helper to build up a chrome shell command with arguments""" """helper to build up a chrome shell command with arguments"""
@ -632,7 +237,7 @@ def chrome_args(**options) -> List[str]:
return cmd_args return cmd_args
class ExtendedEncoder(JSONEncoder): class ExtendedEncoder(pyjson.JSONEncoder):
""" """
Extended json serializer that supports serializing several model Extended json serializer that supports serializing several model
fields and objects fields and objects
@ -656,114 +261,5 @@ class ExtendedEncoder(JSONEncoder):
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'): elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj) return tuple(obj)
return JSONEncoder.default(self, obj) return pyjson.JSONEncoder.default(self, obj)
def to_json(obj: Any, file: IO=None, indent: Optional[int]=4, sort_keys: bool=True, cls=ExtendedEncoder) -> str:
if file:
path = os.path.realpath(file.name)
contents = json.dumps(obj, indent=indent, sort_keys=sort_keys, cls=ExtendedEncoder)
atomic_write(contents, path)
return contents
else:
return json.dumps(obj, indent=indent, sort_keys=sort_keys, cls=ExtendedEncoder)
def links_to_csv(links: List[Link], csv_cols: Optional[List[str]]=None,
header: bool=True, ljust: int=0, separator: str=',') -> str:
csv_cols = csv_cols or ['timestamp', 'is_archived', 'url']
header_str = ''
if header:
header_str = separator.join(col.ljust(ljust) for col in csv_cols)
row_strs = (
link.to_csv(csv_cols=csv_cols, ljust=ljust, separator=separator)
for link in links
)
return '\n'.join((header_str, *row_strs))
def folders_to_str(folders: Dict[str, Optional[Link]]) -> str:
return '\n'.join(f'{folder} {link}' for folder, link in folders.items())
@enforce_types
def render_template(template_path: str, context: Mapping[str, str]) -> str:
"""render a given html template string with the given template content"""
# will be replaced by django templates in the future
with open(template_path, 'r', encoding='utf-8') as template:
template_str = template.read()
return Template(template_str).substitute(**context)
def atomic_write(contents: Union[dict, str, bytes], path: str) -> None:
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
try:
tmp_file = '{}.tmp'.format(path)
if isinstance(contents, bytes):
args = {'mode': 'wb+'}
else:
args = {'mode': 'w+', 'encoding': 'utf-8'}
with open(tmp_file, **args) as f:
if isinstance(contents, dict):
to_json(contents, file=f)
else:
f.write(contents)
os.fsync(f.fileno())
os.rename(tmp_file, path)
chmod_file(path)
finally:
if os.path.exists(tmp_file):
os.remove(tmp_file)
def reject_stdin(caller: str, stdin: Optional[IO]=sys.stdin) -> None:
"""Tell the user they passed stdin to a command that doesn't accept it"""
if stdin and not stdin.isatty():
stdin_raw_text = stdin.read().strip()
if stdin_raw_text:
print(
'{red}[X] The "{}" command does not accept stdin.{reset}\n'.format(
caller,
**ANSI,
)
)
print(' Run archivebox "{} --help" to see usage and examples.'.format(
caller,
))
print()
raise SystemExit(1)
def accept_stdin(stdin: Optional[IO]=sys.stdin) -> Optional[str]:
if stdin and not stdin.isatty():
return stdin.read()
return None
def set_docstring(text: str):
def decorator(func):
@wraps(func)
def wrapper_with_docstring(*args, **kwargs):
return func(*args, **kwargs)
wrapper_with_docstring.__doc__ = text
return wrapper_with_docstring
return decorator
class SmartFormatter(argparse.HelpFormatter):
def _split_lines(self, text, width):
if '\n' in text:
return text.splitlines()
return argparse.HelpFormatter._split_lines(self, text, width)
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints