move everything out of legacy folder

This commit is contained in:
Nick Sweeting 2019-04-27 17:26:24 -04:00
parent 553f312125
commit 1b8abc0961
74 changed files with 3162 additions and 2629 deletions

View file

@ -2,13 +2,17 @@ __package__ = 'archivebox.cli'
import os
from typing import Dict
from typing import Dict, List, Optional, IO
from importlib import import_module
CLI_DIR = os.path.dirname(os.path.abspath(__file__))
# these common commands will appear sorted before any others for ease-of-use
display_first = ('help', 'version', 'init', 'info', 'config', 'list', 'update', 'add', 'remove')
meta_cmds = ('help', 'version')
main_cmds = ('init', 'info', 'config')
archive_cmds = ('add', 'remove', 'update', 'list')
display_first = (*meta_cmds, *main_cmds, *archive_cmds)
# every imported command module must have these properties in order to be valid
required_attrs = ('__package__', '__command__', 'main')
@ -42,11 +46,14 @@ def list_subcommands() -> Dict[str, str]:
return dict(sorted(COMMANDS, key=display_order))
def run_subcommand(subcommand: str, args=None) -> None:
def run_subcommand(subcommand: str,
subcommand_args: List[str]=None,
stdin: Optional[IO]=None,
pwd: Optional[str]=None) -> None:
"""run a given ArchiveBox subcommand with the given list of args"""
module = import_module('.archivebox_{}'.format(subcommand), __package__)
module.main(args) # type: ignore
module.main(args=subcommand_args, stdin=stdin, pwd=pwd) # type: ignore
SUBCOMMANDS = list_subcommands()

View file

@ -5,19 +5,17 @@ __package__ = 'archivebox.cli'
__command__ = 'archivebox'
__description__ = 'ArchiveBox: The self-hosted internet archive.'
import os
import sys
import argparse
from typing import Optional, List, IO
from . import list_subcommands, run_subcommand
from ..legacy.config import OUTPUT_DIR
from ..config import OUTPUT_DIR
def parse_args(args=None):
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
subcommands = list_subcommands()
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -43,54 +41,24 @@ def parse_args(args=None):
default=None,
)
parser.add_argument(
"args",
"subcommand_args",
help="Arguments for the subcommand",
nargs=argparse.REMAINDER,
)
command = parser.parse_args(args)
command = parser.parse_args(args or ())
if command.help:
if command.help or command.subcommand is None:
command.subcommand = 'help'
if command.version:
command.subcommand = 'version'
# print('--------------------------------------------')
# print('Command: ', sys.argv[0])
# print('Subcommand: ', command.subcommand)
# print('Args to pass:', args[1:])
# print('--------------------------------------------')
run_subcommand(
subcommand=command.subcommand,
subcommand_args=command.subcommand_args,
stdin=stdin,
pwd=pwd or OUTPUT_DIR,
)
return command.subcommand, command.args
def print_import_tutorial():
print('Welcome to ArchiveBox!')
print()
print('To import an existing archive (from a previous version of ArchiveBox):')
print(' 1. cd into your data dir OUTPUT_DIR (usually ArchiveBox/output) and run:')
print(' 2. archivebox init')
print()
print('To start a new archive:')
print(' 1. Create an emptry directory, then cd into it and run:')
print(' 2. archivebox init')
print()
print('For more information, see the migration docs here:')
print(' https://github.com/pirate/ArchiveBox/wiki/Migration')
def main(args=None):
subcommand, subcommand_args = parse_args(args)
existing_index = os.path.exists(os.path.join(OUTPUT_DIR, 'index.json'))
if subcommand is None:
if existing_index:
run_subcommand('help', subcommand_args)
else:
print_import_tutorial()
raise SystemExit(0)
run_subcommand(subcommand, subcommand_args)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,90 +7,75 @@ __description__ = 'Add a new URL or list of URLs to your archive'
import sys
import argparse
from typing import List, Optional
from typing import List, Optional, IO
from ..legacy.config import stderr, check_dependencies, check_data_folder
from ..legacy.util import (
handle_stdin_import,
handle_file_import,
)
from ..legacy.main import update_archive_data
from ..main import add
from ..util import SmartFormatter, accept_stdin
from ..config import OUTPUT_DIR, ONLY_NEW
def main(args: List[str]=None, stdin: Optional[str]=None) -> None:
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
formatter_class=SmartFormatter,
)
# parser.add_argument(
# '--depth', #'-d',
# type=int,
# help='Recursively archive all linked pages up to this many hops away',
# default=0,
# )
parser.add_argument(
'--only-new', #'-n',
'--update-all', #'-n',
action='store_true',
help="Don't attempt to retry previously skipped/failed links when updating",
default=not ONLY_NEW,
help="Also retry previously skipped/failed links when adding new links",
)
parser.add_argument(
'--index-only', #'-o',
action='store_true',
help="Add the links to the main index without archiving them",
)
# parser.add_argument(
# '--mirror', #'-m',
# action='store_true',
# help='Archive an entire site (finding all linked pages below it on the same domain)',
# )
# parser.add_argument(
# '--crawler', #'-r',
# choices=('depth_first', 'breadth_first'),
# help='Controls which crawler to use in order to find outlinks in a given page',
# default=None,
# )
parser.add_argument(
'url',
'import_path',
nargs='?',
type=str,
default=None,
help='URL of page to archive (or path to local file)'
help=(
'URL or path to local file containing a list of links to import. e.g.:\n'
' https://getpocket.com/users/USERNAME/feed/all\n'
' https://example.com/some/rss/feed.xml\n'
' ~/Downloads/firefox_bookmarks_export.html\n'
' ~/Desktop/sites_list.csv\n'
)
)
command = parser.parse_args(args)
check_dependencies()
### Handle ingesting urls piped in through stdin
# (.e.g if user does cat example_urls.txt | archivebox add)
import_path = None
if stdin or not sys.stdin.isatty():
stdin_raw_text = stdin or sys.stdin.read()
if stdin_raw_text and command.url:
stderr(
'[X] You should pass either a path as an argument, '
'or pass a list of links via stdin, but not both.\n'
)
raise SystemExit(1)
import_path = handle_stdin_import(stdin_raw_text)
### Handle ingesting url from a remote file/feed
# (e.g. if an RSS feed URL is used as the import path)
elif command.url:
import_path = handle_file_import(command.url)
update_archive_data(
import_path=import_path,
resume=None,
only_new=command.only_new,
command = parser.parse_args(args or ())
import_str = accept_stdin(stdin)
add(
import_str=import_str,
import_path=command.import_path,
update_all=command.update_all,
index_only=command.index_only,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)
# TODO: Implement these
#
# parser.add_argument(
# '--depth', #'-d',
# type=int,
# help='Recursively archive all linked pages up to this many hops away',
# default=0,
# )
# parser.add_argument(
# '--mirror', #'-m',
# action='store_true',
# help='Archive an entire site (finding all linked pages below it on the same domain)',
# )
# parser.add_argument(
# '--crawler', #'-r',
# choices=('depth_first', 'breadth_first'),
# help='Controls which crawler to use in order to find outlinks in a given page',
# default=None,
# )

View file

@ -7,28 +7,14 @@ __description__ = 'Get and set your ArchiveBox project configuration values'
import sys
import argparse
from typing import Optional, List
from typing import Optional, List, IO
from ..legacy.util import SmartFormatter
from ..legacy.config import (
check_data_folder,
OUTPUT_DIR,
load_all_config,
write_config_file,
CONFIG,
CONFIG_FILE,
USER_CONFIG,
ConfigDict,
stderr,
get_real_name,
)
from ..main import config
from ..util import SmartFormatter, accept_stdin
from ..config import OUTPUT_DIR
def main(args: List[str]=None, stdin: Optional[str]=None) -> None:
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -57,102 +43,18 @@ def main(args: List[str]=None, stdin: Optional[str]=None) -> None:
type=str,
help='KEY or KEY=VALUE formatted config values to get or set',
)
command = parser.parse_args(args)
command = parser.parse_args(args or ())
config_options_str = accept_stdin(stdin)
if stdin or not sys.stdin.isatty():
stdin_raw_text = stdin or sys.stdin.read()
if stdin_raw_text and command.config_options:
stderr(
'[X] You should either pass config values as an arguments '
'or via stdin, but not both.\n',
color='red',
)
raise SystemExit(1)
config_options = stdin_raw_text.split('\n')
else:
config_options = command.config_options
no_args = not (command.get or command.set or command.reset or command.config_options)
matching_config: ConfigDict = {}
if command.get or no_args:
if config_options:
config_options = [get_real_name(key) for key in config_options]
matching_config = {key: CONFIG[key] for key in config_options if key in CONFIG}
failed_config = [key for key in config_options if key not in CONFIG]
if failed_config:
stderr()
stderr('[X] These options failed to get', color='red')
stderr(' {}'.format('\n '.join(config_options)))
raise SystemExit(1)
else:
matching_config = CONFIG
print(printable_config(matching_config))
raise SystemExit(not matching_config)
elif command.set:
new_config = {}
failed_options = []
for line in config_options:
if line.startswith('#') or not line.strip():
continue
if '=' not in line:
stderr('[X] Config KEY=VALUE must have an = sign in it', color='red')
stderr(f' {line}')
raise SystemExit(2)
raw_key, val = line.split('=')
raw_key = raw_key.upper().strip()
key = get_real_name(raw_key)
if key != raw_key:
stderr(f'[i] Note: The config option {raw_key} has been renamed to {key}, please use the new name going forwards.', color='lightyellow')
if key in CONFIG:
new_config[key] = val.strip()
else:
failed_options.append(line)
if new_config:
before = CONFIG
matching_config = write_config_file(new_config, out_dir=OUTPUT_DIR)
after = load_all_config()
print(printable_config(matching_config))
side_effect_changes: ConfigDict = {}
for key, val in after.items():
if key in USER_CONFIG and (before[key] != after[key]) and (key not in matching_config):
side_effect_changes[key] = after[key]
if side_effect_changes:
stderr()
stderr('[i] Note: This change also affected these other options that depended on it:', color='lightyellow')
print(' {}'.format(printable_config(side_effect_changes, prefix=' ')))
if failed_options:
stderr()
stderr('[X] These options failed to set:', color='red')
stderr(' {}'.format('\n '.join(failed_options)))
raise SystemExit(bool(failed_options))
elif command.reset:
stderr('[X] This command is not implemented yet.', color='red')
stderr(' Please manually remove the relevant lines from your config file:')
stderr(f' {CONFIG_FILE}')
raise SystemExit(2)
else:
stderr('[X] You must pass either --get or --set, or no arguments to get the whole config.', color='red')
stderr(' archivebox config')
stderr(' archivebox config --get SOME_KEY')
stderr(' archivebox config --set SOME_KEY=SOME_VALUE')
raise SystemExit(2)
def printable_config(config: ConfigDict, prefix: str='') -> str:
return f'\n{prefix}'.join(
f'{key}={val}'
for key, val in config.items()
if not (isinstance(val, dict) or callable(val))
config(
config_options_str=config_options_str,
config_options=command.config_options,
get=command.get,
set=command.set,
reset=command.reset,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,52 +7,24 @@ __description__ = 'Print the ArchiveBox help message and usage'
import sys
import argparse
from ..legacy.util import reject_stdin
from ..legacy.config import ANSI
from . import list_subcommands
from typing import Optional, List, IO
from ..main import help
from ..util import reject_stdin
from ..config import OUTPUT_DIR
def main(args=None):
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
)
parser.parse_args(args)
reject_stdin(__command__)
parser.parse_args(args or ())
reject_stdin(__command__, stdin)
COMMANDS_HELP_TEXT = '\n '.join(
f'{cmd.ljust(20)} {summary}'
for cmd, summary in list_subcommands().items()
)
print('''{green}ArchiveBox: The self-hosted internet archive.{reset}
{lightblue}Usage:{reset}
archivebox [command] [--help] [--version] [...args]
{lightblue}Comamnds:{reset}
{}
{lightblue}Example Use:{reset}
mkdir my-archive; cd my-archive/
archivebox init
archivebox info
archivebox add https://example.com/some/page
archivebox add --depth=1 ~/Downloads/bookmarks_export.html
archivebox list --sort=timestamp --csv=timestamp,url,is_archived
archivebox schedule --every=week https://example.com/some/feed.rss
archivebox update --resume=15109948213.123
{lightblue}Documentation:{reset}
https://github.com/pirate/ArchiveBox/wiki
'''.format(COMMANDS_HELP_TEXT, **ANSI))
help(out_dir=pwd or OUTPUT_DIR)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,25 +7,24 @@ __description__ = 'Print out some info and statistics about the archive collecti
import sys
import argparse
from ..legacy.config import check_data_folder
from ..legacy.util import reject_stdin
from ..legacy.main import info
from typing import Optional, List, IO
from ..main import info
from ..config import OUTPUT_DIR
from ..util import reject_stdin
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
)
parser.parse_args(args)
reject_stdin(__command__)
parser.parse_args(args or ())
reject_stdin(__command__, stdin)
info(out_dir=pwd or OUTPUT_DIR)
info()
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,23 +7,24 @@ __description__ = 'Initialize a new ArchiveBox collection in the current directo
import sys
import argparse
from ..legacy.util import reject_stdin
from ..legacy.main import init
from typing import Optional, List, IO
from ..main import init
from ..util import reject_stdin
from ..config import OUTPUT_DIR
def main(args=None):
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
)
parser.parse_args(args)
reject_stdin(__command__)
parser.parse_args(args or ())
reject_stdin(__command__, stdin)
init()
init(out_dir=pwd or OUTPUT_DIR)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -2,15 +2,17 @@
__package__ = 'archivebox.cli'
__command__ = 'archivebox list'
__description__ = 'List all the URLs currently in the archive.'
__description__ = 'List, filter, and export information about archive entries'
import sys
import argparse
from ..legacy.util import SmartFormatter, reject_stdin, to_json, to_csv
from ..legacy.config import check_data_folder, OUTPUT_DIR
from ..legacy.main import (
list_archive_data,
from typing import Optional, List, IO
from ..main import list_all
from ..util import SmartFormatter, accept_stdin
from ..config import OUTPUT_DIR
from ..index import (
get_indexed_folders,
get_archived_folders,
get_unarchived_folders,
@ -23,11 +25,7 @@ from ..legacy.main import (
get_unrecognized_folders,
)
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -93,57 +91,27 @@ def main(args=None):
help='Type of pattern matching to use when filtering URLs',
)
parser.add_argument(
'patterns',
'filter_patterns',
nargs='*',
type=str,
default=None,
help='List only URLs matching these filter patterns.'
)
command = parser.parse_args(args)
reject_stdin(__command__)
command = parser.parse_args(args or ())
filter_patterns_str = accept_stdin(stdin)
links = list_archive_data(
filter_patterns=command.patterns,
list_all(
filter_patterns_str=filter_patterns_str,
filter_patterns=command.filter_patterns,
filter_type=command.filter_type,
before=command.before,
status=command.status,
after=command.after,
before=command.before,
sort=command.sort,
csv=command.csv,
json=command.json,
out_dir=pwd or OUTPUT_DIR,
)
if command.sort:
links = sorted(links, key=lambda link: getattr(link, command.sort))
links = list(links)
if command.status == 'indexed':
folders = get_indexed_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'archived':
folders = get_archived_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'unarchived':
folders = get_unarchived_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'present':
folders = get_present_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'valid':
folders = get_valid_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'invalid':
folders = get_invalid_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'duplicate':
folders = get_duplicate_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'orphaned':
folders = get_orphaned_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'corrupted':
folders = get_corrupted_folders(links, out_dir=OUTPUT_DIR)
elif command.status == 'unrecognized':
folders = get_unrecognized_folders(links, out_dir=OUTPUT_DIR)
if command.csv:
print(to_csv(folders.values(), csv_cols=command.csv.split(','), header=True))
elif command.json:
print(to_json(folders.values(), indent=4, sort_keys=True))
else:
print('\n'.join(f'{folder} {link}' for folder, link in folders.items()))
raise SystemExit(not folders)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -6,24 +6,18 @@ __description__ = 'Run an ArchiveBox Django management command'
import sys
from ..legacy.config import OUTPUT_DIR, setup_django, check_data_folder
from typing import Optional, List, IO
from ..main import manage
from ..config import OUTPUT_DIR
def main(args=None):
check_data_folder()
setup_django(OUTPUT_DIR)
from django.core.management import execute_from_command_line
args = sys.argv if args is None else ['archivebox', *args]
args[0] = f'{sys.argv[0]} manage'
if args[1:] == []:
args.append('help')
execute_from_command_line(args)
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
manage(
args=args,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,17 +7,14 @@ __description__ = 'Remove the specified URLs from the archive.'
import sys
import argparse
from typing import Optional, List, IO
from ..legacy.config import check_data_folder
from ..legacy.util import reject_stdin
from ..legacy.main import remove_archive_links
from ..main import remove
from ..util import accept_stdin
from ..config import OUTPUT_DIR
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -56,33 +53,25 @@ def main(args=None):
help='Type of pattern matching to use when filtering URLs',
)
parser.add_argument(
'pattern',
'filter_patterns',
nargs='*',
type=str,
default=None,
help='URLs matching this filter pattern will be removed from the index.'
)
command = parser.parse_args(args)
command = parser.parse_args(args or ())
filter_str = accept_stdin(stdin)
if not sys.stdin.isatty():
stdin_raw_text = sys.stdin.read()
if stdin_raw_text and command.url:
print(
'[X] You should pass either a pattern as an argument, '
'or pass a list of patterns via stdin, but not both.\n'
)
raise SystemExit(1)
patterns = [pattern.strip() for pattern in stdin_raw_text.split('\n')]
else:
patterns = command.pattern
remove_archive_links(
filter_patterns=patterns, filter_type=command.filter_type,
before=command.before, after=command.after,
yes=command.yes, delete=command.delete,
remove(
filter_str=filter_str,
filter_patterns=command.filter_patterns,
filter_type=command.filter_type,
before=command.before,
after=command.after,
yes=command.yes,
delete=command.delete,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -4,34 +4,17 @@ __package__ = 'archivebox.cli'
__command__ = 'archivebox schedule'
__description__ = 'Set ArchiveBox to regularly import URLs at specific times using cron'
import os
import sys
import argparse
from datetime import datetime
from crontab import CronTab, CronSlices
from typing import Optional, List, IO
from ..main import schedule
from ..util import reject_stdin
from ..config import OUTPUT_DIR
from ..legacy.util import reject_stdin
from ..legacy.config import (
OUTPUT_DIR,
LOGS_DIR,
ARCHIVEBOX_BINARY,
USER,
ANSI,
stderr,
check_data_folder,
)
CRON_COMMENT = 'archivebox_schedule'
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -57,7 +40,7 @@ def main(args=None):
group.add_argument(
'--clear', # '-c'
action='store_true',
help=("Stop all ArchiveBox scheduled runs, clear it completely from cron"),
help=("Stop all ArchiveBox scheduled runs (remove cron jobs)"),
)
group.add_argument(
'--show', # '-s'
@ -67,13 +50,14 @@ def main(args=None):
group.add_argument(
'--foreground', '-f',
action='store_true',
help=("Launch ArchiveBox as a long-running foreground task "
help=("Launch ArchiveBox scheduler as a long-running foreground task "
"instead of using cron."),
)
group.add_argument(
'--run-all', # '-a',
action='store_true',
help='Run all the scheduled jobs once immediately, independent of their configured schedules',
help=("Run all the scheduled jobs once immediately, independent of "
"their configured schedules, can be used together with --foreground"),
)
parser.add_argument(
'import_path',
@ -83,115 +67,21 @@ def main(args=None):
help=("Check this path and import any new links on every run "
"(can be either local file or remote URL)"),
)
command = parser.parse_args(args)
reject_stdin(__command__)
command = parser.parse_args(args or ())
reject_stdin(__command__, stdin)
os.makedirs(LOGS_DIR, exist_ok=True)
cron = CronTab(user=True)
cron = dedupe_jobs(cron)
existing_jobs = list(cron.find_comment(CRON_COMMENT))
if command.foreground or command.run_all:
if command.import_path or (not existing_jobs):
stderr('{red}[X] You must schedule some jobs first before running in foreground mode.{reset}'.format(**ANSI))
stderr(' archivebox schedule --every=hour https://example.com/some/rss/feed.xml')
raise SystemExit(1)
print('{green}[*] Running {} ArchiveBox jobs in foreground task scheduler...{reset}'.format(len(existing_jobs), **ANSI))
if command.run_all:
try:
for job in existing_jobs:
sys.stdout.write(f' > {job.command}')
sys.stdout.flush()
job.run()
sys.stdout.write(f'\r{job.command}\n')
except KeyboardInterrupt:
print('\n{green}[√] Stopped.{reset}'.format(**ANSI))
raise SystemExit(1)
if command.foreground:
try:
for result in cron.run_scheduler():
print(result)
except KeyboardInterrupt:
print('\n{green}[√] Stopped.{reset}'.format(**ANSI))
raise SystemExit(1)
elif command.show:
if existing_jobs:
print('\n'.join(str(cmd) for cmd in existing_jobs))
else:
stderr('{red}[X] There are no ArchiveBox cron jobs scheduled for your user ({}).{reset}'.format(USER, **ANSI))
stderr(' To schedule a new job, run:')
stderr(' archivebox schedule --every=[timeperiod] https://example.com/some/rss/feed.xml')
raise SystemExit(0)
elif command.clear:
print(cron.remove_all(comment=CRON_COMMENT))
cron.write()
raise SystemExit(0)
elif command.every:
quoted = lambda s: f'"{s}"' if s and ' ' in s else s
cmd = [
'cd',
quoted(OUTPUT_DIR),
'&&',
quoted(ARCHIVEBOX_BINARY),
*(('add', f'"{command.import_path}"',) if command.import_path else ('update',)),
'2>&1',
'>',
quoted(os.path.join(LOGS_DIR, 'archivebox.log')),
]
new_job = cron.new(command=' '.join(cmd), comment=CRON_COMMENT)
if command.every in ('minute', 'hour', 'day', 'week', 'month', 'year'):
set_every = getattr(new_job.every(), command.every)
set_every()
elif CronSlices.is_valid(command.every):
new_job.setall(command.every)
else:
stderr('{red}[X] Got invalid timeperiod for cron task.{reset}'.format(**ANSI))
stderr(' It must be one of minute/hour/day/week/month')
stderr(' or a quoted cron-format schedule like:')
stderr(' archivebox init --every=day https://example.com/some/rss/feed.xml')
stderr(' archivebox init --every="0/5 * * * *" https://example.com/some/rss/feed.xml')
raise SystemExit(1)
cron = dedupe_jobs(cron)
cron.write()
total_runs = sum(j.frequency_per_year() for j in cron)
existing_jobs = list(cron.find_comment(CRON_COMMENT))
print()
print('{green}[√] Scheduled new ArchiveBox cron job for user: {} ({} jobs are active).{reset}'.format(USER, len(existing_jobs), **ANSI))
print('\n'.join(f' > {cmd}' if str(cmd) == str(new_job) else f' {cmd}' for cmd in existing_jobs))
if total_runs > 60 and not command.quiet:
stderr()
stderr('{lightyellow}[!] With the current cron config, ArchiveBox is estimated to run >{} times per year.{reset}'.format(total_runs, **ANSI))
stderr(f' Congrats on being an enthusiastic internet archiver! 👌')
stderr()
stderr(' Make sure you have enough storage space available to hold all the data.')
stderr(' Using a compressed/deduped filesystem like ZFS is recommended if you plan on archiving a lot.')
raise SystemExit(0)
def dedupe_jobs(cron: CronTab) -> CronTab:
deduped = set()
for job in list(cron):
unique_tuple = (str(job.slices), job.command)
if unique_tuple not in deduped:
deduped.add(unique_tuple)
cron.remove(job)
for schedule, command in deduped:
job = cron.new(command=command, comment=CRON_COMMENT)
job.setall(schedule)
job.enable()
return cron
schedule(
add=command.add,
show=command.show,
clear=command.clear,
foreground=command.foreground,
run_all=command.run_all,
quiet=command.quiet,
every=command.every,
import_path=command.import_path,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,15 +7,14 @@ __description__ = 'Run the ArchiveBox HTTP server'
import sys
import argparse
from ..legacy.config import setup_django, IS_TTY, OUTPUT_DIR, ANSI, check_data_folder
from ..legacy.util import reject_stdin
from typing import Optional, List, IO
from ..main import server
from ..util import reject_stdin
from ..config import OUTPUT_DIR
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -33,26 +32,15 @@ def main(args=None):
action='store_true',
help='Enable auto-reloading when code or templates change',
)
command = parser.parse_args(args)
reject_stdin(__command__)
command = parser.parse_args(args or ())
reject_stdin(__command__, stdin)
setup_django(OUTPUT_DIR)
from django.core.management import call_command
from django.contrib.auth.models import User
if IS_TTY and not User.objects.filter(is_superuser=True).exists():
print('{lightyellow}[!] No admin users exist yet, you will not be able to edit links in the UI.{reset}'.format(**ANSI))
print()
print(' To create an admin user, run:')
print(' archivebox manage createsuperuser')
print()
print('{green}[+] Starting ArchiveBox webserver...{reset}'.format(**ANSI))
if not command.reload:
command.runserver_args.append('--noreload')
call_command("runserver", *command.runserver_args)
server(
runserver_args=command.runserver_args,
reload=command.reload,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -7,27 +7,26 @@ __description__ = 'Enter an interactive ArchiveBox Django shell'
import sys
import argparse
from ..legacy.config import setup_django, OUTPUT_DIR, check_data_folder
from ..legacy.util import reject_stdin
from typing import Optional, List, IO
from ..main import shell
from ..config import OUTPUT_DIR
from ..util import reject_stdin
def main(args=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
)
parser.parse_args(args)
reject_stdin(__command__)
parser.parse_args(args or ())
reject_stdin(__command__, stdin)
shell(
out_dir=pwd or OUTPUT_DIR,
)
setup_django(OUTPUT_DIR)
from django.core.management import call_command
call_command("shell_plus")
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -2,27 +2,36 @@
__package__ = 'archivebox.cli'
__command__ = 'archivebox update'
__description__ = 'Import any new links from subscriptions and retry any previously failed/skipped links.'
__description__ = 'Import any new links from subscriptions and retry any previously failed/skipped links'
import sys
import argparse
from typing import List
from typing import List, Optional, IO
from ..legacy.config import check_data_folder
from ..legacy.util import reject_stdin
from ..legacy.main import update_archive_data
from ..main import update
from ..util import SmartFormatter, accept_stdin
from ..config import OUTPUT_DIR
from ..index import (
get_indexed_folders,
get_archived_folders,
get_unarchived_folders,
get_present_folders,
get_valid_folders,
get_invalid_folders,
get_duplicate_folders,
get_orphaned_folders,
get_corrupted_folders,
get_unrecognized_folders,
)
def main(args: List[str]=None):
check_data_folder()
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
add_help=True,
formatter_class=SmartFormatter,
)
parser.add_argument(
'--only-new', #'-n',
@ -40,16 +49,75 @@ def main(args: List[str]=None):
help='Resume the update process from a given timestamp',
default=None,
)
parser.add_argument(
'--overwrite', #'-x',
action='store_true',
help='Ignore existing archived content and overwrite with new versions (DANGEROUS)',
)
parser.add_argument(
'--before', #'-b',
type=float,
help="Update only links bookmarked before the given timestamp.",
default=None,
)
parser.add_argument(
'--after', #'-a',
type=float,
help="Update only links bookmarked after the given timestamp.",
default=None,
)
parser.add_argument(
'--status',
type=str,
choices=('indexed', 'archived', 'unarchived', 'present', 'valid', 'invalid', 'duplicate', 'orphaned', 'corrupted', 'unrecognized'),
default='indexed',
help=(
'Update only links or data directories that have the given status\n'
f' indexed {get_indexed_folders.__doc__} (the default)\n'
f' archived {get_archived_folders.__doc__}\n'
f' unarchived {get_unarchived_folders.__doc__}\n'
'\n'
f' present {get_present_folders.__doc__}\n'
f' valid {get_valid_folders.__doc__}\n'
f' invalid {get_invalid_folders.__doc__}\n'
'\n'
f' duplicate {get_duplicate_folders.__doc__}\n'
f' orphaned {get_orphaned_folders.__doc__}\n'
f' corrupted {get_corrupted_folders.__doc__}\n'
f' unrecognized {get_unrecognized_folders.__doc__}\n'
)
)
parser.add_argument(
'--filter-type',
type=str,
choices=('exact', 'substring', 'domain', 'regex'),
default='exact',
help='Type of pattern matching to use when filtering URLs',
)
parser.add_argument(
'filter_patterns',
nargs='*',
type=str,
default=None,
help='List only URLs matching these filter patterns.'
)
command = parser.parse_args(args)
reject_stdin(__command__)
filter_patterns_str = accept_stdin(stdin)
update_archive_data(
import_path=None,
update(
resume=command.resume,
only_new=command.only_new,
index_only=command.index_only,
overwrite=command.overwrite,
filter_patterns_str=filter_patterns_str,
filter_patterns=command.filter_patterns,
filter_type=command.filter_type,
status=command.status,
after=command.after,
before=command.before,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -4,26 +4,17 @@ __package__ = 'archivebox.cli'
__command__ = 'archivebox version'
__description__ = 'Print the ArchiveBox version and dependency information'
import os
import re
import sys
import argparse
from ..legacy.util import reject_stdin, human_readable_size
from ..legacy.config import (
ANSI,
VERSION,
CODE_LOCATIONS,
CONFIG_LOCATIONS,
DATA_LOCATIONS,
DEPENDENCIES,
check_dependencies,
)
from typing import Optional, List, IO
from ..main import version
from ..util import reject_stdin
from ..config import OUTPUT_DIR
def main(args=None):
args = sys.argv[1:] if args is None else args
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=__description__,
@ -34,92 +25,14 @@ def main(args=None):
action='store_true',
help='Only print ArchiveBox version number and nothing else.',
)
command = parser.parse_args(args)
reject_stdin(__command__)
command = parser.parse_args(args or ())
reject_stdin(__command__, stdin)
if command.quiet:
print(VERSION)
else:
print('ArchiveBox v{}'.format(VERSION))
print()
print('{white}[i] Dependency versions:{reset}'.format(**ANSI))
for name, dependency in DEPENDENCIES.items():
print_dependency_version(name, dependency)
print()
print('{white}[i] Code locations:{reset}'.format(**ANSI))
for name, folder in CODE_LOCATIONS.items():
print_folder_status(name, folder)
print()
print('{white}[i] Config locations:{reset}'.format(**ANSI))
for name, folder in CONFIG_LOCATIONS.items():
print_folder_status(name, folder)
print()
print('{white}[i] Data locations:{reset}'.format(**ANSI))
for name, folder in DATA_LOCATIONS.items():
print_folder_status(name, folder)
print()
check_dependencies()
def print_folder_status(name, folder):
if folder['enabled']:
if folder['is_valid']:
color, symbol, note = 'green', '', 'valid'
else:
color, symbol, note, num_files = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, num_files = 'lightyellow', '-', 'disabled', '-'
if folder['path']:
if os.path.exists(folder['path']):
num_files = (
f'{len(os.listdir(folder["path"]))} files'
if os.path.isdir(folder['path']) else
human_readable_size(os.path.getsize(folder['path']))
)
else:
num_files = 'missing'
print(
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(24),
(folder["path"] or '').ljust(70),
num_files.ljust(14),
ANSI[color],
note,
ANSI['reset'],
)
def print_dependency_version(name, dependency):
if dependency['enabled']:
if dependency['is_valid']:
color, symbol, note = 'green', '', 'valid'
version = 'v' + re.search(r'[\d\.]+', dependency['version'])[0]
else:
color, symbol, note, version = 'red', 'X', 'invalid', '?'
else:
color, symbol, note, version = 'lightyellow', '-', 'disabled', '-'
print(
ANSI[color],
symbol,
ANSI['reset'],
name.ljust(24),
(dependency["path"] or '').ljust(70),
version.ljust(14),
ANSI[color],
note,
ANSI['reset'],
version(
quiet=command.quiet,
out_dir=pwd or OUTPUT_DIR,
)
if __name__ == '__main__':
main()
main(args=sys.argv[1:], stdin=sys.stdin)

268
archivebox/cli/logging.py Normal file
View file

@ -0,0 +1,268 @@
__package__ = 'archivebox.cli'
import os
import sys
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, List
from ..index.schema import Link, ArchiveResult
from ..config import ANSI, OUTPUT_DIR, IS_TTY
@dataclass
class RuntimeStats:
"""mutable stats counter for logging archiving timing info to CLI output"""
skipped: int = 0
succeeded: int = 0
failed: int = 0
parse_start_ts: Optional[datetime] = None
parse_end_ts: Optional[datetime] = None
index_start_ts: Optional[datetime] = None
index_end_ts: Optional[datetime] = None
archiving_start_ts: Optional[datetime] = None
archiving_end_ts: Optional[datetime] = None
# globals are bad, mmkay
_LAST_RUN_STATS = RuntimeStats()
def pretty_path(path: str) -> str:
"""convert paths like .../ArchiveBox/archivebox/../output/abc into output/abc"""
pwd = os.path.abspath('.')
# parent = os.path.abspath(os.path.join(pwd, os.path.pardir))
return path.replace(pwd + '/', './')
### Parsing Stage
def log_parsing_started(source_file: str):
start_ts = datetime.now()
_LAST_RUN_STATS.parse_start_ts = start_ts
print('\n{green}[*] [{}] Parsing new links from output/sources/{}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
source_file.rsplit('/', 1)[-1],
**ANSI,
))
def log_parsing_finished(num_parsed: int, num_new_links: int, parser_name: str):
end_ts = datetime.now()
_LAST_RUN_STATS.parse_end_ts = end_ts
print(' > Parsed {} links as {} ({} new links added)'.format(num_parsed, parser_name, num_new_links))
### Indexing Stage
def log_indexing_process_started(num_links: int):
start_ts = datetime.now()
_LAST_RUN_STATS.index_start_ts = start_ts
print()
print('{green}[*] [{}] Writing {} links to main index...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
**ANSI,
))
def log_indexing_process_finished():
end_ts = datetime.now()
_LAST_RUN_STATS.index_end_ts = end_ts
def log_indexing_started(out_path: str):
if IS_TTY:
sys.stdout.write(f' > {out_path}')
def log_indexing_finished(out_path: str):
print(f'\r{out_path}')
### Archiving Stage
def log_archiving_started(num_links: int, resume: Optional[float]=None):
start_ts = datetime.now()
_LAST_RUN_STATS.archiving_start_ts = start_ts
print()
if resume:
print('{green}[▶] [{}] Resuming archive updating for {} pages starting from {}...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
resume,
**ANSI,
))
else:
print('{green}[▶] [{}] Updating content for {} matching pages in archive...{reset}'.format(
start_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
**ANSI,
))
def log_archiving_paused(num_links: int, idx: int, timestamp: str):
end_ts = datetime.now()
_LAST_RUN_STATS.archiving_end_ts = end_ts
print()
print('\n{lightyellow}[X] [{now}] Downloading paused on link {timestamp} ({idx}/{total}){reset}'.format(
**ANSI,
now=end_ts.strftime('%Y-%m-%d %H:%M:%S'),
idx=idx+1,
timestamp=timestamp,
total=num_links,
))
print(' To view your archive, open:')
print(' {}/index.html'.format(OUTPUT_DIR))
print(' Continue archiving where you left off by running:')
print(' archivebox update --resume={}'.format(timestamp))
def log_archiving_finished(num_links: int):
end_ts = datetime.now()
_LAST_RUN_STATS.archiving_end_ts = end_ts
assert _LAST_RUN_STATS.archiving_start_ts is not None
seconds = end_ts.timestamp() - _LAST_RUN_STATS.archiving_start_ts.timestamp()
if seconds > 60:
duration = '{0:.2f} min'.format(seconds / 60, 2)
else:
duration = '{0:.2f} sec'.format(seconds, 2)
print()
print('{}[√] [{}] Update of {} pages complete ({}){}'.format(
ANSI['green'],
end_ts.strftime('%Y-%m-%d %H:%M:%S'),
num_links,
duration,
ANSI['reset'],
))
print(' - {} links skipped'.format(_LAST_RUN_STATS.skipped))
print(' - {} links updated'.format(_LAST_RUN_STATS.succeeded))
print(' - {} links had errors'.format(_LAST_RUN_STATS.failed))
print(' To view your archive, open:')
print(' {}/index.html'.format(OUTPUT_DIR))
def log_link_archiving_started(link: Link, link_dir: str, is_new: bool):
# [*] [2019-03-22 13:46:45] "Log Structured Merge Trees - ben stopford"
# http://www.benstopford.com/2015/02/14/log-structured-merge-trees/
# > output/archive/1478739709
print('\n[{symbol_color}{symbol}{reset}] [{symbol_color}{now}{reset}] "{title}"'.format(
symbol_color=ANSI['green' if is_new else 'black'],
symbol='+' if is_new else '',
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
title=link.title or link.base_url,
**ANSI,
))
print(' {blue}{url}{reset}'.format(url=link.url, **ANSI))
print(' {} {}'.format(
'>' if is_new else '',
pretty_path(link_dir),
))
def log_link_archiving_finished(link: Link, link_dir: str, is_new: bool, stats: dict):
total = sum(stats.values())
if stats['failed'] > 0 :
_LAST_RUN_STATS.failed += 1
elif stats['skipped'] == total:
_LAST_RUN_STATS.skipped += 1
else:
_LAST_RUN_STATS.succeeded += 1
def log_archive_method_started(method: str):
print(' > {}'.format(method))
def log_archive_method_finished(result: ArchiveResult):
"""quote the argument with whitespace in a command so the user can
copy-paste the outputted string directly to run the cmd
"""
# Prettify CMD string and make it safe to copy-paste by quoting arguments
quoted_cmd = ' '.join(
'"{}"'.format(arg) if ' ' in arg else arg
for arg in result.cmd
)
if result.status == 'failed':
# Prettify error output hints string and limit to five lines
hints = getattr(result.output, 'hints', None) or ()
if hints:
hints = hints if isinstance(hints, (list, tuple)) else hints.split('\n')
hints = (
' {}{}{}'.format(ANSI['lightyellow'], line.strip(), ANSI['reset'])
for line in hints[:5] if line.strip()
)
# Collect and prefix output lines with indentation
output_lines = [
'{lightred}Failed:{reset}'.format(**ANSI),
' {reset}{} {red}{}{reset}'.format(
result.output.__class__.__name__.replace('ArchiveError', ''),
result.output,
**ANSI,
),
*hints,
'{}Run to see full output:{}'.format(ANSI['lightred'], ANSI['reset']),
*([' cd {};'.format(result.pwd)] if result.pwd else []),
' {}'.format(quoted_cmd),
]
print('\n'.join(
' {}'.format(line)
for line in output_lines
if line
))
print()
def log_list_started(filter_patterns: Optional[List[str]], filter_type: str):
print('{green}[*] Finding links in the archive index matching these {} patterns:{reset}'.format(
filter_type,
**ANSI,
))
print(' {}'.format(' '.join(filter_patterns or ())))
def log_list_finished(links):
from ..util import links_to_csv
print()
print('---------------------------------------------------------------------------------------------------')
print(links_to_csv(links, csv_cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
print('---------------------------------------------------------------------------------------------------')
print()
def log_removal_started(links: List[Link], yes: bool, delete: bool):
print('{lightyellow}[i] Found {} matching URLs to remove.{reset}'.format(len(links), **ANSI))
if delete:
file_counts = [link.num_outputs for link in links if os.path.exists(link.link_dir)]
print(
f' {len(links)} Links will be de-listed from the main index, and their archived content folders will be deleted from disk.\n'
f' ({len(file_counts)} data folders with {sum(file_counts)} archived files will be deleted!)'
)
else:
print(
f' Matching links will be de-listed from the main index, but their archived content folders will remain in place on disk.\n'
f' (Pass --delete if you also want to permanently delete the data folders)'
)
if not yes:
print()
print('{lightyellow}[?] Do you want to proceed with removing these {} links?{reset}'.format(len(links), **ANSI))
try:
assert input(' y/[n]: ').lower() == 'y'
except (KeyboardInterrupt, EOFError, AssertionError):
raise SystemExit(0)
def log_removal_finished(all_links: int, to_keep: int):
if all_links == 0:
print()
print('{red}[X] No matching links found.{reset}'.format(**ANSI))
else:
num_removed = all_links - to_keep
print()
print('{red}[√] Removed {} out of {} links from the archive index.{reset}'.format(
num_removed,
all_links,
**ANSI,
))
print(' Index now contains {} links.'.format(to_keep))

226
archivebox/cli/tests.py Executable file
View file

@ -0,0 +1,226 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
import os
import sys
import shutil
import unittest
from contextlib import contextmanager
TEST_CONFIG = {
'USE_COLOR': 'False',
'SHOW_PROGRESS': 'False',
'OUTPUT_DIR': 'data.tests',
'SAVE_ARCHIVE_DOT_ORG': 'False',
'SAVE_TITLE': 'False',
'USE_CURL': 'False',
'USE_WGET': 'False',
'USE_GIT': 'False',
'USE_CHROME': 'False',
'USE_YOUTUBEDL': 'False',
}
OUTPUT_DIR = 'data.tests'
os.environ.update(TEST_CONFIG)
from ..main import init
from ..index import load_main_index
from ..config import (
SQL_INDEX_FILENAME,
JSON_INDEX_FILENAME,
HTML_INDEX_FILENAME,
)
from . import (
archivebox_init,
archivebox_add,
archivebox_remove,
)
HIDE_CLI_OUTPUT = True
test_urls = '''
https://example1.com/what/is/happening.html?what=1#how-about-this=1
https://example2.com/what/is/happening/?what=1#how-about-this=1
HTtpS://example3.com/what/is/happening/?what=1#how-about-this=1f
https://example4.com/what/is/happening.html
https://example5.com/
https://example6.com
<test>http://example7.com</test>
[https://example8.com/what/is/this.php?what=1]
[and http://example9.com?what=1&other=3#and-thing=2]
<what>https://example10.com#and-thing=2 "</about>
abc<this["https://subb.example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def
sdflkf[what](https://subb.example12.com/who/what.php?whoami=1#whatami=2)?am=hi
example13.bada
and example14.badb
<or>htt://example15.badc</that>
'''
stdout = sys.stdout
stderr = sys.stderr
@contextmanager
def output_hidden(show_failing=True):
if not HIDE_CLI_OUTPUT:
yield
return
sys.stdout = open('stdout.txt', 'w+')
sys.stderr = open('stderr.txt', 'w+')
try:
yield
sys.stdout.close()
sys.stderr.close()
sys.stdout = stdout
sys.stderr = stderr
except:
sys.stdout.close()
sys.stderr.close()
sys.stdout = stdout
sys.stderr = stderr
if show_failing:
with open('stdout.txt', 'r') as f:
print(f.read())
with open('stderr.txt', 'r') as f:
print(f.read())
raise
finally:
os.remove('stdout.txt')
os.remove('stderr.txt')
class TestInit(unittest.TestCase):
def setUp(self):
os.makedirs(OUTPUT_DIR, exist_ok=True)
def tearDown(self):
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
def test_basic_init(self):
with output_hidden():
archivebox_init.main([])
assert os.path.exists(os.path.join(OUTPUT_DIR, SQL_INDEX_FILENAME))
assert os.path.exists(os.path.join(OUTPUT_DIR, JSON_INDEX_FILENAME))
assert os.path.exists(os.path.join(OUTPUT_DIR, HTML_INDEX_FILENAME))
assert len(load_main_index(out_dir=OUTPUT_DIR)) == 0
def test_conflicting_init(self):
with open(os.path.join(OUTPUT_DIR, 'test_conflict.txt'), 'w+') as f:
f.write('test')
try:
with output_hidden(show_failing=False):
archivebox_init.main([])
assert False, 'Init should have exited with an exception'
except SystemExit:
pass
assert not os.path.exists(os.path.join(OUTPUT_DIR, SQL_INDEX_FILENAME))
assert not os.path.exists(os.path.join(OUTPUT_DIR, JSON_INDEX_FILENAME))
assert not os.path.exists(os.path.join(OUTPUT_DIR, HTML_INDEX_FILENAME))
try:
load_main_index(out_dir=OUTPUT_DIR)
assert False, 'load_main_index should raise an exception when no index is present'
except:
pass
def test_no_dirty_state(self):
with output_hidden():
init()
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
with output_hidden():
init()
class TestAdd(unittest.TestCase):
def setUp(self):
os.makedirs(OUTPUT_DIR, exist_ok=True)
with output_hidden():
init()
def tearDown(self):
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
def test_add_arg_url(self):
with output_hidden():
archivebox_add.main(['https://getpocket.com/users/nikisweeting/feed/all'])
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 30
def test_add_arg_file(self):
test_file = os.path.join(OUTPUT_DIR, 'test.txt')
with open(test_file, 'w+') as f:
f.write(test_urls)
with output_hidden():
archivebox_add.main([test_file])
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 12
os.remove(test_file)
def test_add_stdin_url(self):
with output_hidden():
archivebox_add.main([], stdin=test_urls)
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 12
class TestRemove(unittest.TestCase):
def setUp(self):
os.makedirs(OUTPUT_DIR, exist_ok=True)
with output_hidden():
init()
archivebox_add.main([], stdin=test_urls)
# def tearDown(self):
# shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
def test_remove_exact(self):
with output_hidden():
archivebox_remove.main(['--yes', '--delete', 'https://example5.com/'])
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 11
def test_remove_regex(self):
with output_hidden():
archivebox_remove.main(['--yes', '--delete', '--filter-type=regex', 'http(s)?:\/\/(.+\.)?(example\d\.com)'])
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 4
def test_remove_domain(self):
with output_hidden():
archivebox_remove.main(['--yes', '--delete', '--filter-type=domain', 'example5.com', 'example6.com'])
all_links = load_main_index(out_dir=OUTPUT_DIR)
assert len(all_links) == 10
def test_remove_none(self):
try:
with output_hidden(show_failing=False):
archivebox_remove.main(['--yes', '--delete', 'https://doesntexist.com'])
assert False, 'Should raise if no URLs match'
except:
pass
if __name__ == '__main__':
if '--verbose' in sys.argv or '-v' in sys.argv:
HIDE_CLI_OUTPUT = False
unittest.main()