test: Fix tests post-rebase

This commit is contained in:
Cristian 2020-09-15 14:05:48 -05:00 committed by Cristian Vargas
parent 422664079a
commit b18bbf8874
15 changed files with 114 additions and 121 deletions

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional, List, Iterable, Union
from datetime import datetime
@ -57,7 +58,7 @@ def ignore_methods(to_ignore: List[str]):
return list(methods)
@enforce_types
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None, skip_index: bool=False) -> Link:
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[Path]=None, skip_index: bool=False) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = get_default_archive_methods()
@ -68,7 +69,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
if method[0] in methods
]
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
try:
is_new = not os.path.exists(out_dir)
if is_new:
@ -130,7 +131,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
return link
@enforce_types
def archive_links(all_links: Union[Iterable[Link], QuerySet], overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None) -> List[Link]:
def archive_links(all_links: Union[Iterable[Link], QuerySet], overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[Path]=None) -> List[Link]:
if type(all_links) is QuerySet:
num_links: int = all_links.count()
@ -149,7 +150,7 @@ def archive_links(all_links: Union[Iterable[Link], QuerySet], overwrite: bool=Fa
for link in all_links:
idx += 1
to_archive = get_link(link)
archive_link(to_archive, overwrite=overwrite, methods=methods, out_dir=link.link_dir)
archive_link(to_archive, overwrite=overwrite, methods=methods, out_dir=Path(link.link_dir))
except KeyboardInterrupt:
log_archiving_paused(num_links, idx, link.timestamp)
raise SystemExit(0)

View file

@ -1,7 +1,7 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
@ -24,22 +24,22 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_archive_dot_org(link: Link, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir
def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or Path(link.link_dir)
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'archive.org.txt')):
if (out_dir / "archive.org.txt").exists():
# if open(path, 'r').read().strip() != 'None':
return False
return SAVE_ARCHIVE_DOT_ORG
@enforce_types
def save_archive_dot_org(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""submit site to archive.org for archiving via their service, save returned archive url"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'archive.org.txt'
archive_org_url = None
submit_url = 'https://web.archive.org/save/{}'.format(link.url)
@ -57,7 +57,7 @@ def save_archive_dot_org(link: Link, out_dir: Optional[str]=None, timeout: int=T
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
content_location, errors = parse_archive_dot_org_response(result.stdout)
if content_location:
archive_org_url = 'https://web.archive.org{}'.format(content_location[0])
@ -80,14 +80,14 @@ def save_archive_dot_org(link: Link, out_dir: Optional[str]=None, timeout: int=T
# the URL in person, it will attempt to re-archive it, and it'll show the
# nicer error message explaining why the url was rejected if it fails.
archive_org_url = archive_org_url or submit_url
with open(os.path.join(out_dir, str(output)), 'w', encoding='utf-8') as f:
with open(str(out_dir / output), 'w', encoding='utf-8') as f:
f.write(archive_org_url)
chmod_file('archive.org.txt', cwd=out_dir)
chmod_file('archive.org.txt', cwd=str(out_dir))
output = archive_org_url
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CURL_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,6 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -21,23 +20,23 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_dom(link: Link, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir
def should_save_dom(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or Path(link.link_dir)
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'output.html')):
if (out_dir / 'output.html').exists():
return False
return SAVE_DOM
@enforce_types
def save_dom(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_dom(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""print HTML of site to file using chrome --dump-html"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'output.html'
output_path = os.path.join(out_dir, str(output))
output_path = out_dir / output
cmd = [
*chrome_args(TIMEOUT=timeout),
'--dump-dom',
@ -46,14 +45,14 @@ def save_dom(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
atomic_write(output_path, result.stdout)
if result.returncode:
hints = result.stderr.decode()
raise ArchiveError('Failed to save DOM', hints)
chmod_file(output, cwd=out_dir)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
@ -62,7 +61,7 @@ def save_dom(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CHROME_VERSION,
output=output,
status=status,

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
@ -27,7 +28,7 @@ def should_save_favicon(link: Link, out_dir: Optional[str]=None) -> bool:
return SAVE_FAVICON
@enforce_types
def save_favicon(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_favicon(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download site favicon from google's favicon api"""
out_dir = out_dir or link.link_dir
@ -46,8 +47,8 @@ def save_favicon(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT)
status = 'pending'
timer = TimedProgress(timeout, prefix=' ')
try:
run(cmd, cwd=out_dir, timeout=timeout)
chmod_file(output, cwd=out_dir)
run(cmd, cwd=str(out_dir), timeout=timeout)
chmod_file(output, cwd=str(out_dir))
status = 'succeeded'
except Exception as err:
status = 'failed'
@ -57,7 +58,7 @@ def save_favicon(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT)
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CURL_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,7 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -27,12 +27,12 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_git(link: Link, out_dir: Optional[str]=None) -> bool:
def should_save_git(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'git')):
if (out_dir / "git").exists():
return False
is_clonable_url = (
@ -46,13 +46,13 @@ def should_save_git(link: Link, out_dir: Optional[str]=None) -> bool:
@enforce_types
def save_git(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_git(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using git"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'git'
output_path = os.path.join(out_dir, str(output))
os.makedirs(output_path, exist_ok=True)
output_path = out_dir / output
output_path.mkdir(exist_ok=True)
cmd = [
GIT_BINARY,
'clone',
@ -63,7 +63,7 @@ def save_git(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=output_path, timeout=timeout + 1)
result = run(cmd, cwd=str(output_path), timeout=timeout + 1)
if result.returncode == 128:
# ignore failed re-download when the folder already exists
pass
@ -71,7 +71,7 @@ def save_git(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
hints = 'Got git response code: {}.'.format(result.returncode)
raise ArchiveError('Failed to save git clone', hints)
chmod_file(output, cwd=out_dir)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
@ -81,7 +81,7 @@ def save_git(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=GIT_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,6 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -22,25 +21,25 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_media(link: Link, out_dir: Optional[str]=None) -> bool:
def should_save_media(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or link.link_dir
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'media')):
if (out_dir / "media").exists():
return False
return SAVE_MEDIA
@enforce_types
def save_media(link: Link, out_dir: Optional[str]=None, timeout: int=MEDIA_TIMEOUT) -> ArchiveResult:
def save_media(link: Link, out_dir: Optional[Path]=None, timeout: int=MEDIA_TIMEOUT) -> ArchiveResult:
"""Download playlists or individual video, audio, and subtitles using youtube-dl"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'media'
output_path = os.path.join(out_dir, str(output))
os.makedirs(output_path, exist_ok=True)
output_path = out_dir / output
output_path.mkdir(exist_ok=True)
cmd = [
YOUTUBEDL_BINARY,
'--write-description',
@ -66,8 +65,8 @@ def save_media(link: Link, out_dir: Optional[str]=None, timeout: int=MEDIA_TIMEO
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=output_path, timeout=timeout + 1)
chmod_file(output, cwd=out_dir)
result = run(cmd, cwd=str(output_path), timeout=timeout + 1)
chmod_file(output, cwd=str(out_dir))
if result.returncode:
if (b'ERROR: Unsupported URL' in result.stderr
or b'HTTP Error 404' in result.stderr
@ -90,7 +89,7 @@ def save_media(link: Link, out_dir: Optional[str]=None, timeout: int=MEDIA_TIMEO
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=YOUTUBEDL_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,6 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -20,22 +19,22 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_pdf(link: Link, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir
def should_save_pdf(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or Path(link.link_dir)
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'output.pdf')):
if (out_dir / "output.pdf").exists():
return False
return SAVE_PDF
@enforce_types
def save_pdf(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_pdf(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""print PDF of site to file using chrome --headless"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'output.pdf'
cmd = [
*chrome_args(TIMEOUT=timeout),
@ -45,13 +44,13 @@ def save_pdf(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
if result.returncode:
hints = (result.stderr or result.stdout).decode()
raise ArchiveError('Failed to save PDF', hints)
chmod_file('output.pdf', cwd=out_dir)
chmod_file('output.pdf', cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
@ -61,7 +60,7 @@ def save_pdf(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> A
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CHROME_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,6 @@
__package__ = 'archivebox.extractors'
import os
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -21,21 +20,21 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_screenshot(link: Link, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir
def should_save_screenshot(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or Path(link.link_dir)
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(out_dir, 'screenshot.png')):
if (out_dir / "screenshot.png").exists():
return False
return SAVE_SCREENSHOT
@enforce_types
def save_screenshot(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_screenshot(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""take screenshot of site using chrome --headless"""
out_dir = out_dir or link.link_dir
out_dir = out_dir or Path(link.link_dir)
output: ArchiveOutput = 'screenshot.png'
cmd = [
*chrome_args(TIMEOUT=timeout),
@ -45,13 +44,13 @@ def save_screenshot(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOU
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
if result.returncode:
hints = (result.stderr or result.stdout).decode()
raise ArchiveError('Failed to save screenshot', hints)
chmod_file(output, cwd=out_dir)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
@ -60,7 +59,7 @@ def save_screenshot(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOU
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CHROME_VERSION,
output=output,
status=status,

View file

@ -23,21 +23,21 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_singlefile(link: Link, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir
def should_save_singlefile(link: Link, out_dir: Optional[Path]=None) -> bool:
out_dir = out_dir or Path(link.link_dir)
if is_static_file(link.url):
return False
output = Path(out_dir or link.link_dir) / 'singlefile.html'
output = out_dir / 'singlefile.html'
return SAVE_SINGLEFILE and SINGLEFILE_VERSION and (not output.exists())
@enforce_types
def save_singlefile(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_singlefile(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using single-file"""
out_dir = out_dir or link.link_dir
output = str(Path(out_dir).absolute() / "singlefile.html")
out_dir = out_dir or Path(link.link_dir)
output = str(out_dir.absolute() / "singlefile.html")
browser_args = chrome_args(TIMEOUT=0)
@ -54,7 +54,7 @@ def save_singlefile(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOU
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
# parse out number of files downloaded from last line of stderr:
# "Downloaded: 76 files, 4.0M in 1.6s (2.52 MB/s)"
@ -82,7 +82,7 @@ def save_singlefile(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOU
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=SINGLEFILE_VERSION,
output=output,
status=status,

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.extractors'
import re
from pathlib import Path
from typing import Optional
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
@ -41,7 +42,7 @@ def should_save_title(link: Link, out_dir: Optional[str]=None) -> bool:
return SAVE_TITLE
@enforce_types
def save_title(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_title(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""try to guess the page's title from its content"""
setup_django(out_dir=out_dir)
@ -77,7 +78,7 @@ def save_title(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=CURL_VERSION,
output=output,
status=status,

View file

@ -1,7 +1,7 @@
__package__ = 'archivebox.extractors'
import os
import re
from pathlib import Path
from typing import Optional
from datetime import datetime
@ -35,24 +35,24 @@ from ..logging_util import TimedProgress
@enforce_types
def should_save_wget(link: Link, out_dir: Optional[str]=None) -> bool:
def should_save_wget(link: Link, out_dir: Optional[Path]=None) -> bool:
output_path = wget_output_path(link)
out_dir = out_dir or link.link_dir
if output_path and os.path.exists(os.path.join(out_dir, output_path)):
out_dir = out_dir or Path(link.link_dir)
if output_path and (out_dir / output_path).exists():
return False
return SAVE_WGET
@enforce_types
def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
def save_wget(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download full site using wget"""
out_dir = out_dir or link.link_dir
if SAVE_WARC:
warc_dir = os.path.join(out_dir, 'warc')
os.makedirs(warc_dir, exist_ok=True)
warc_path = os.path.join('warc', str(int(datetime.now().timestamp())))
warc_dir = out_dir / "warc"
warc_dir.mkdir(exist_ok=True)
warc_path = warc_dir / str(int(datetime.now().timestamp()))
# WGET CLI Docs: https://www.gnu.org/software/wget/manual/wget.html
output: ArchiveOutput = None
@ -69,7 +69,7 @@ def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
'-e', 'robots=off',
'--timeout={}'.format(timeout),
*(['--restrict-file-names={}'.format(RESTRICT_FILE_NAMES)] if RESTRICT_FILE_NAMES else []),
*(['--warc-file={}'.format(warc_path)] if SAVE_WARC else []),
*(['--warc-file={}'.format(str(warc_path))] if SAVE_WARC else []),
*(['--page-requisites'] if SAVE_WGET_REQUISITES else []),
*(['--user-agent={}'.format(WGET_USER_AGENT)] if WGET_USER_AGENT else []),
*(['--load-cookies', COOKIES_FILE] if COOKIES_FILE else []),
@ -82,7 +82,7 @@ def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
result = run(cmd, cwd=out_dir, timeout=timeout)
result = run(cmd, cwd=str(out_dir), timeout=timeout)
output = wget_output_path(link)
# parse out number of files downloaded from last line of stderr:
@ -111,7 +111,7 @@ def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
if b'ERROR 500: Internal Server Error' in result.stderr:
raise ArchiveError('500 Internal Server Error', hints)
raise ArchiveError('Wget failed or got an error from the server', hints)
chmod_file(output, cwd=out_dir)
chmod_file(output, cwd=str(out_dir))
except Exception as err:
status = 'failed'
output = err
@ -120,7 +120,7 @@ def save_wget(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) ->
return ArchiveResult(
cmd=cmd,
pwd=out_dir,
pwd=str(out_dir),
cmd_version=WGET_VERSION,
output=output,
status=status,
@ -170,26 +170,21 @@ def wget_output_path(link: Link) -> Optional[str]:
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
link.link_dir,
domain(link.url).replace(":", "+"),
urldecode(full_path),
)
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+") / urldecode(full_path)
for _ in range(4):
if os.path.exists(search_dir):
if os.path.isdir(search_dir):
if search_dir.exists():
if search_dir.is_dir():
html_files = [
f for f in os.listdir(search_dir)
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
f for f in search_dir.iterdir()
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", str(f), re.I | re.M)
]
if html_files:
path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
return str(Path(search_dir.name) / html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
search_dir = search_dir.parent
if search_dir == link.link_dir:
if str(search_dir) == link.link_dir:
break
return None