-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
644 additions
and
266 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
|
||
import sys | ||
import os | ||
import argparse | ||
|
||
from pywaybackup.helper import url_split, sanitize_filename | ||
|
||
from pywaybackup.__version__ import __version__ | ||
|
||
class Arguments: | ||
|
||
def __init__(self): | ||
|
||
parser = argparse.ArgumentParser(description='Download from wayback machine (archive.org)') | ||
parser.add_argument('-a', '--about', action='version', version='%(prog)s ' + __version__ + ' by @bitdruid -> https://github.com/bitdruid') | ||
parser.add_argument('-d', '--debug', action='store_true', help='Debug mode (Always full traceback and creates an error.log') | ||
|
||
required = parser.add_argument_group('required (one exclusive)') | ||
required.add_argument('-u', '--url', type=str, metavar="", help='url (with subdir/subdomain) to download') | ||
exclusive_required = required.add_mutually_exclusive_group(required=True) | ||
exclusive_required.add_argument('-c', '--current', action='store_true', help='download the latest version of each file snapshot') | ||
exclusive_required.add_argument('-f', '--full', action='store_true', help='download snapshots of all timestamps') | ||
exclusive_required.add_argument('-s', '--save', action='store_true', help='save a page to the wayback machine') | ||
|
||
optional = parser.add_argument_group('optional query parameters') | ||
optional.add_argument('-l', '--list', action='store_true', help='only print snapshots (opt range in y)') | ||
optional.add_argument('-e', '--explicit', action='store_true', help='search only for the explicit given url') | ||
optional.add_argument('-o', '--output', type=str, metavar="", help='output folder - defaults to current directory') | ||
optional.add_argument('-r', '--range', type=int, metavar="", help='range in years to search') | ||
optional.add_argument('--start', type=int, metavar="", help='start timestamp format: YYYYMMDDhhmmss') | ||
optional.add_argument('--end', type=int, metavar="", help='end timestamp format: YYYYMMDDhhmmss') | ||
|
||
special = parser.add_argument_group('manipulate behavior') | ||
special.add_argument('--csv', type=str, nargs='?', const=True, metavar='path', help='save a csv file with the json output - defaults to output folder') | ||
special.add_argument('--skip', type=str, nargs='?', const=True, metavar='path', help='skips existing files in the output folder by checking the .csv file - defaults to output folder') | ||
special.add_argument('--no-redirect', action='store_true', help='do not follow redirects by archive.org') | ||
special.add_argument('--verbosity', type=str, default="info", metavar="", help='["progress", "json"] for different output or ["trace"] for very detailed output') | ||
special.add_argument('--log', type=str, nargs='?', const=True, metavar='path', help='save a log file - defaults to output folder') | ||
special.add_argument('--retry', type=int, default=0, metavar="", help='retry failed downloads (opt tries as int, else infinite)') | ||
special.add_argument('--workers', type=int, default=1, metavar="", help='number of workers (simultaneous downloads)') | ||
# special.add_argument('--convert-links', action='store_true', help='Convert all links in the files to local paths. Requires -c/--current') | ||
special.add_argument('--delay', type=int, default=0, metavar="", help='delay between each download in seconds') | ||
|
||
cdx = parser.add_argument_group('cdx (one exclusive)') | ||
exclusive_cdx = cdx.add_mutually_exclusive_group() | ||
exclusive_cdx.add_argument('--cdxbackup', type=str, nargs='?', const=True, metavar='path', help='Save the cdx query-result to a file for recurent use - defaults to output folder') | ||
exclusive_cdx.add_argument('--cdxinject', type=str, nargs='?', const=True, metavar='path', help='Inject a cdx backup-file to download according to the given url') | ||
|
||
auto = parser.add_argument_group('auto') | ||
auto.add_argument('--auto', action='store_true', help='includes automatic csv, skip and cdxbackup/cdxinject to resume a stopped download') | ||
|
||
args = parser.parse_args(args=None if sys.argv[1:] else ['--help']) # if no arguments are given, print help | ||
|
||
# if args.convert_links and not args.current: | ||
# parser.error("--convert-links can only be used with the -c/--current option") | ||
|
||
self.args = args | ||
|
||
def get_args(self): | ||
return self.args | ||
|
||
class Configuration: | ||
|
||
@classmethod | ||
def init(cls): | ||
|
||
cls.args = Arguments().get_args() | ||
for key, value in vars(cls.args).items(): | ||
setattr(Configuration, key, value) | ||
|
||
# args now attributes of Configuration // Configuration.output, ... | ||
cls.command = ' '.join(sys.argv[1:]) | ||
cls.domain, cls.subdir, cls.filename = url_split(cls.url) | ||
|
||
if cls.output is None: | ||
cls.output = os.path.join(os.getcwd(), "waybackup_snapshots") | ||
os.makedirs(cls.output, exist_ok=True) | ||
|
||
if cls.log is True: | ||
cls.log = os.path.join(cls.output, f"waybackup_{sanitize_filename(cls.url)}.log") | ||
|
||
if cls.full: | ||
cls.mode = "full" | ||
if cls.current: | ||
cls.mode = "current" | ||
|
||
if cls.auto: | ||
cls.skip = cls.output | ||
cls.csv = cls.output | ||
cls.cdxbackup = cls.output | ||
cls.cdxinject = os.path.join(cls.output, f"waybackup_{sanitize_filename(cls.url)}.cdx") | ||
else: | ||
if cls.skip is True: | ||
cls.skip = cls.output | ||
if cls.csv is True: | ||
cls.csv = cls.output | ||
if cls.cdxbackup is True: | ||
cls.cdxbackup = cls.output | ||
if cls.cdxinject is True: | ||
cls.cdxinject = cls.output | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
import os | ||
import errno | ||
import magic | ||
from pywaybackup.helper import url_split | ||
|
||
from pywaybackup.Arguments import Configuration as config | ||
from pywaybackup.Verbosity import Verbosity as vb | ||
import re | ||
|
||
class Converter: | ||
|
||
@classmethod | ||
def define_root_steps(cls, filepath) -> str: | ||
""" | ||
Define the steps (../) to the root directory. | ||
""" | ||
abs_path = os.path.abspath(filepath) | ||
webroot_path = os.path.abspath(f"{config.output}/{config.domain}/") # webroot is the domain folder in the output | ||
# common path between the two | ||
common_path = os.path.commonpath([abs_path, webroot_path]) | ||
# steps up to the common path | ||
rel_path_from_common = os.path.relpath(abs_path, common_path) | ||
steps_up = rel_path_from_common.count(os.path.sep) | ||
if steps_up <= 1: # if the file is in the root of the domain | ||
return "./" | ||
return "../" * steps_up | ||
|
||
|
||
|
||
|
||
|
||
@classmethod | ||
def links(cls, filepath, status_message=None): | ||
""" | ||
Convert all links in a HTML / CSS / JS file to local paths. | ||
""" | ||
|
||
|
||
def extract_urls(content) -> list: | ||
""" | ||
Extract all links from a file. | ||
""" | ||
|
||
#content = re.sub(r'\s+', '', content) | ||
#content = re.sub(r'\n', '', content) | ||
|
||
html_types = ["src", "href", "poster", "data-src"] | ||
css_types = ["url"] | ||
links = [] | ||
for html_type in html_types: | ||
# possible formatings of the value: "url", 'url', url | ||
matches = re.findall(f"{html_type}=[\"']?([^\"'>]+)", content) | ||
links += matches | ||
for css_type in css_types: | ||
# possible formatings of the value: url(url) url('url') url("url") // ends with ) | ||
matches = re.findall(rf"{css_type}\((['\"]?)([^'\"\)]+)\1\)", content) | ||
links += [match[1] for match in matches] | ||
links = list(set(links)) | ||
return links | ||
|
||
|
||
def local_url(original_url, domain, count) -> str: | ||
""" | ||
Convert a given url to a local path. | ||
""" | ||
original_url_domain = url_split(original_url)[0] | ||
|
||
# check if the url is external or internal (external is returned as is because no need to convert) | ||
external = False | ||
if original_url_domain != domain: | ||
if "://" in original_url: | ||
external = True | ||
if original_url.startswith("//"): | ||
external = True | ||
if external: | ||
status_message.trace(status="", type=f"{count}/{len(links)}", message="External url") | ||
return original_url | ||
|
||
# convert the url to a relative path to the local root (download dir) if it's a valid path, else return the original url | ||
original_url_file = os.path.join(config.output, config.domain, normalize_url(original_url)) | ||
if validate_path(original_url_file): | ||
if original_url.startswith("/"): # if only starts with / | ||
original_url = f"{cls.define_root_steps(filepath)}{original_url.lstrip('/')}" | ||
if original_url.startswith(".//"): | ||
original_url = f"{cls.define_root_steps(filepath)}{original_url.lstrip('./')}" | ||
if original_url_domain == domain: # if url is like https://domain.com/path/to/file | ||
original_url = f"{cls.define_root_steps(filepath)}{original_url.split(domain)[1].lstrip('/')}" | ||
if original_url.startswith("../"): # if file is already ../ check if it's not too many steps up | ||
original_url = f"{cls.define_root_steps(filepath)}{original_url.split('../')[-1].lstrip('/')}" | ||
else: | ||
status_message.trace(status="", type="", message=f"{count}/{len(links)}: URL is not a valid path") | ||
|
||
return original_url | ||
|
||
|
||
|
||
|
||
|
||
def normalize_url(url) -> str: | ||
""" | ||
Normalize a given url by removing it's protocol, domain and parent directorie references. | ||
Example1: | ||
- Example input: https://domain.com/path/to/file | ||
- Example output: /path/to/file | ||
Example2 | ||
- input: ../path/to/file | ||
- output: /path/to/file | ||
""" | ||
try: | ||
url = "/" + url.split("../")[-1] | ||
except IndexError: | ||
pass | ||
if url.startswith("//"): | ||
url = "/" + url.split("//")[1] | ||
parsed_url = url_split(url) | ||
return f"{parsed_url[1]}/{parsed_url[2]}" | ||
|
||
|
||
def is_pathname_valid(pathname: str) -> bool: | ||
""" | ||
Check if a given pathname is valid. | ||
""" | ||
if not isinstance(pathname, str) or not pathname: | ||
return False | ||
|
||
try: | ||
os.lstat(pathname) | ||
except OSError as exc: | ||
if exc.errno == errno.ENOENT: | ||
return True | ||
elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: | ||
return False | ||
return True | ||
|
||
def is_path_creatable(pathname: str) -> bool: | ||
""" | ||
Check if a given path is creatable. | ||
""" | ||
dirname = os.path.dirname(pathname) or os.getcwd() | ||
return os.access(dirname, os.W_OK) | ||
|
||
def is_path_exists_or_creatable(pathname: str) -> bool: | ||
""" | ||
Check if a given path exists or is creatable. | ||
""" | ||
return is_pathname_valid(pathname) or is_path_creatable(pathname) | ||
|
||
def validate_path(filepath: str) -> bool: | ||
""" | ||
Validate if a given path can exist. | ||
""" | ||
return is_path_exists_or_creatable(filepath) | ||
|
||
|
||
|
||
|
||
|
||
if os.path.isfile(filepath): | ||
if magic.from_file(filepath, mime=True).split("/")[1] == "javascript": | ||
status_message.trace(status="Error", type="", message="JS-file is not supported") | ||
return | ||
try: | ||
with open(filepath, "r") as file: | ||
domain = config.domain | ||
content = file.read() | ||
links = extract_urls(content) | ||
status_message.store(message=f"\n-----> Convert: [{len(links)}] links in file") | ||
count = 1 | ||
for original_link in links: | ||
status_message.trace(status="ORIG", type=f"{count}/{len(links)}", message=original_link) | ||
new_link = local_url(original_link, domain, count) | ||
if new_link != original_link: | ||
status_message.trace(status="CONV", type=f"{count}/{len(links)}", message=new_link) | ||
content = content.replace(original_link, new_link) | ||
count += 1 | ||
file = open(filepath, "w") | ||
file.write(content) | ||
file.close() | ||
except UnicodeDecodeError: | ||
status_message.trace(status="Error", type="", message="Could not decode file to convert links") |
Oops, something went wrong.