working argparse based CLI with most commands implemented

This commit is contained in:
Nick Sweeting 2019-04-03 00:27:37 -04:00
parent 68b4c01c6b
commit 51ae634ec9
20 changed files with 807 additions and 424 deletions

View file

@ -3,7 +3,8 @@ import json
from datetime import datetime
from string import Template
from typing import List, Tuple, Iterator, Optional, Mapping
from typing import List, Tuple, Iterator, Optional, Mapping, Iterable
from collections import OrderedDict
from .schema import Link, ArchiveResult
from .config import (
@ -13,14 +14,15 @@ from .config import (
GIT_SHA,
FOOTER_INFO,
TIMEOUT,
URL_BLACKLIST_PTN,
)
from .util import (
scheme,
fuzzy_url,
ts_to_date,
merge_links,
urlencode,
htmlencode,
urldecode,
derived_link_info,
wget_output_path,
enforce_types,
TimedProgress,
@ -28,7 +30,6 @@ from .util import (
atomic_write,
)
from .parse import parse_links
from .links import validate_links
from .logs import (
log_indexing_process_started,
log_indexing_started,
@ -41,6 +42,147 @@ TITLE_LOADING_MSG = 'Not yet archived...'
### Link filtering and checking
@enforce_types
def derived_link_info(link: Link) -> dict:
"""extend link info with the archive urls and other derived data"""
info = link._asdict(extended=True)
info.update(link.canonical_outputs())
return info
@enforce_types
def merge_links(a: Link, b: Link) -> Link:
"""deterministially merge two links, favoring longer field values over shorter,
and "cleaner" values over worse ones.
"""
assert a.base_url == b.base_url, 'Cannot merge two links with different URLs'
url = a.url if len(a.url) > len(b.url) else b.url
possible_titles = [
title
for title in (a.title, b.title)
if title and title.strip() and '://' not in title
]
title = None
if len(possible_titles) == 2:
title = max(possible_titles, key=lambda t: len(t))
elif len(possible_titles) == 1:
title = possible_titles[0]
timestamp = (
a.timestamp
if float(a.timestamp or 0) < float(b.timestamp or 0) else
b.timestamp
)
tags_set = (
set(tag.strip() for tag in (a.tags or '').split(','))
| set(tag.strip() for tag in (b.tags or '').split(','))
)
tags = ','.join(tags_set) or None
sources = list(set(a.sources + b.sources))
all_methods = set(list(a.history.keys()) + list(a.history.keys()))
history = {
method: (a.history.get(method) or []) + (b.history.get(method) or [])
for method in all_methods
}
return Link(
url=url,
timestamp=timestamp,
title=title,
tags=tags,
sources=sources,
history=history,
)
def validate_links(links: Iterable[Link]) -> Iterable[Link]:
links = archivable_links(links) # remove chrome://, about:, mailto: etc.
links = sorted_links(links) # deterministically sort the links based on timstamp, url
links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
if not links:
print('[X] No links found :(')
raise SystemExit(1)
return links
def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
for link in links:
scheme_is_valid = scheme(link.url) in ('http', 'https', 'ftp')
not_blacklisted = (not URL_BLACKLIST_PTN.match(link.url)) if URL_BLACKLIST_PTN else True
if scheme_is_valid and not_blacklisted:
yield link
def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
"""
ensures that all non-duplicate links have monotonically increasing timestamps
"""
unique_urls: OrderedDict[str, Link] = OrderedDict()
for link in sorted_links:
fuzzy = fuzzy_url(link.url)
if fuzzy in unique_urls:
# merge with any other links that share the same url
link = merge_links(unique_urls[fuzzy], link)
unique_urls[fuzzy] = link
unique_timestamps: OrderedDict[str, Link] = OrderedDict()
for link in unique_urls.values():
new_link = link.overwrite(
timestamp=lowest_uniq_timestamp(unique_timestamps, link.timestamp),
)
unique_timestamps[new_link.timestamp] = new_link
return unique_timestamps.values()
def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
return sorted(links, key=sort_func, reverse=True)
def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
if not resume:
yield from links
return
for link in links:
try:
if float(link.timestamp) <= resume:
yield link
except (ValueError, TypeError):
print('Resume value and all timestamp values must be valid numbers.')
def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
"""resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
timestamp = timestamp.split('.')[0]
nonce = 0
# first try 152323423 before 152323423.0
if timestamp not in used_timestamps:
return timestamp
new_timestamp = '{}.{}'.format(timestamp, nonce)
while new_timestamp in used_timestamps:
nonce += 1
new_timestamp = '{}.{}'.format(timestamp, nonce)
return new_timestamp
### Homepage index for all the links