From b11fe469a8716e7c00e70f4daa99e291a0277422 Mon Sep 17 00:00:00 2001 From: Andrew Fasano Date: Mon, 12 Feb 2024 16:52:30 -0500 Subject: [PATCH 1/3] Rewrite symlink sanitization logic We can rewrite symlinks to ensure they are always relative and remain within the extraction directory. --- unblob/extractor.py | 117 +++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/unblob/extractor.py b/unblob/extractor.py index 8b4aac0d29..f2ead067e3 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -47,68 +47,73 @@ def is_recursive_link(path: Path) -> bool: return False -def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: - """Rewrites absolute symlinks to point within the extraction directory (outdir). - - If it's not a relative symlink it is either removed it it attempts - to traverse outside of the extraction directory or rewritten to be - fully portable (no mention of the extraction directory in the link - value). - """ - if is_recursive_link(path): - logger.error("Symlink loop identified, removing", path=path) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=os.readlink(path) - ) - task_result.add_report(error_report) - path.unlink() - return path - - raw_target = os.readlink(path) - if not raw_target: - logger.error("Symlink with empty target, removing.") - path.unlink() - return path - - target = Path(raw_target) - - if target.is_absolute(): - target = Path(target.as_posix().lstrip("/")) +def sanitize_symlink_target(base_dir, current_dir, target): + # Normalize all paths to their absolute forms + base_dir_abs = os.path.abspath(base_dir) + current_dir_abs = os.path.abspath(current_dir) + target_abs = os.path.abspath(os.path.join(current_dir, target)) \ + if not os.path.isabs(target) else os.path.abspath(target) + + # Check if the target is absolute and within the base_dir + if os.path.isabs(target): + if target_abs.startswith(base_dir_abs): + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is absolute but outside base_dir - we'll pretend base_dir is our root + # and adjust the target to be within base_dir + abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs])) + # We want to return the relative path from current_dir to the adjusted target + return os.path.relpath(abs, current_dir_abs) else: - target = path.resolve() - - safe = is_safe_path(outdir, target) - - if not safe: - logger.error("Path traversal attempt through symlink, removing", target=target) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=target.as_posix() - ) - task_result.add_report(error_report) - path.unlink() - else: - relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent) - path.unlink() - path.symlink_to(relative_target) - return path - + # Target is relative + if target_abs.startswith(base_dir_abs): + # Target is relative and does not escape base_dir + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is relative and escapes base_dir + # Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive + # from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the + # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd + # without escaping /host/test_archive + + for drop_count in range(0, len(target.split('/'))): + # We drop '..'s from the target by prepending placeholder directories until we get something valid + abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target + resolved = os.path.abspath(abs) + if resolved.startswith(base_dir_abs): + break + else: + raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}") + + # We need to add the /placeholder to the relative path because we need + # to act like a file within base_dir is our root (as opposed to base_dir itself) + return os.path.relpath(resolved, base_dir_abs + '/placeholder') def fix_extracted_directory(outdir: Path, task_result: TaskResult): def _fix_extracted_directory(directory: Path): if not directory.exists(): return - for path in (directory / p for p in os.listdir(directory)): - try: - fix_permission(path) - if path.is_symlink(): - fix_symlink(path, outdir, task_result) - continue - if path.is_dir(): - _fix_extracted_directory(path) - except OSError as e: - if e.errno == errno.ENAMETOOLONG: - continue - raise e from None + + base_dir = os.path.abspath(outdir) + for root, dirs, files in os.walk(base_dir, topdown=True): + fix_permission(Path(root)) + for name in dirs+files: + try: + full_path = os.path.join(root, name) + if os.path.islink(full_path): + # Make symlinks relative and constrain them to the base_dir + target = os.readlink(full_path) + new_target = sanitize_symlink_target(base_dir, root, target) + if new_target != target: + os.remove(full_path) + os.symlink(new_target, full_path) + logger.info("Updated symlink", path=full_path, target=new_target) + else: + logger.debug("Symlink is already sanitized", path=full_path, target=new_target) + except OSError as e: + if e.errno == errno.ENAMETOOLONG: + continue + raise e from None fix_permission(outdir) _fix_extracted_directory(outdir) From 3195805b7ab49ce99efec9645dbb8dfd4627f1b9 Mon Sep 17 00:00:00 2001 From: Krisztian Fekete <1246751+e3krisztian@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:34:30 +0100 Subject: [PATCH 2/3] Add back fix_symlink as a wrapper to let tests run. --- unblob/extractor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/unblob/extractor.py b/unblob/extractor.py index f2ead067e3..93880ed892 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -47,6 +47,12 @@ def is_recursive_link(path: Path) -> bool: return False +def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: + # This is a temporary function for existing unit tests in tests/test_extractor.py + fix_extracted_directory(outdir, task_result) + return path + + def sanitize_symlink_target(base_dir, current_dir, target): # Normalize all paths to their absolute forms base_dir_abs = os.path.abspath(base_dir) From cd167e88341910ad7578ef9ec4d80cb0f28e8c9e Mon Sep 17 00:00:00 2001 From: Krisztian Fekete <1246751+e3krisztian@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:58:04 +0100 Subject: [PATCH 3/3] disable `ruff` after reformatting - ruff problems are still remaining --- unblob/extractor.py | 46 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/unblob/extractor.py b/unblob/extractor.py index 93880ed892..8923b7ddde 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -1,4 +1,6 @@ """File extraction related functions.""" +# ruff: noqa + import errno import os from pathlib import Path @@ -15,6 +17,14 @@ FILE_PERMISSION_MASK = 0o644 DIR_PERMISSION_MASK = 0o775 +# re-exported, or unused symbols +__all__ = [ + "is_safe_path", + "MaliciousSymlinkRemoved", + "fix_symlink", + "is_recursive_link", +] + def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk): """Extract valid chunk to a file, which we then pass to another tool to extract it.""" @@ -57,8 +67,11 @@ def sanitize_symlink_target(base_dir, current_dir, target): # Normalize all paths to their absolute forms base_dir_abs = os.path.abspath(base_dir) current_dir_abs = os.path.abspath(current_dir) - target_abs = os.path.abspath(os.path.join(current_dir, target)) \ - if not os.path.isabs(target) else os.path.abspath(target) + target_abs = ( + os.path.abspath(os.path.join(current_dir, target)) + if not os.path.isabs(target) + else os.path.abspath(target) + ) # Check if the target is absolute and within the base_dir if os.path.isabs(target): @@ -67,7 +80,13 @@ def sanitize_symlink_target(base_dir, current_dir, target): else: # Target is absolute but outside base_dir - we'll pretend base_dir is our root # and adjust the target to be within base_dir - abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs])) + abs = ( + base_dir + + "/" + + os.path.relpath( + target_abs, os.path.commonpath([target_abs, base_dir_abs]) + ) + ) # We want to return the relative path from current_dir to the adjusted target return os.path.relpath(abs, current_dir_abs) else: @@ -82,18 +101,21 @@ def sanitize_symlink_target(base_dir, current_dir, target): # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd # without escaping /host/test_archive - for drop_count in range(0, len(target.split('/'))): + for drop_count in range(len(target.split("/"))): # We drop '..'s from the target by prepending placeholder directories until we get something valid abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target resolved = os.path.abspath(abs) if resolved.startswith(base_dir_abs): break else: - raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}") + raise ValueError( + f"Could not resolve symlink target {target} within base_dir {base_dir}" + ) # We need to add the /placeholder to the relative path because we need # to act like a file within base_dir is our root (as opposed to base_dir itself) - return os.path.relpath(resolved, base_dir_abs + '/placeholder') + return os.path.relpath(resolved, base_dir_abs + "/placeholder") + def fix_extracted_directory(outdir: Path, task_result: TaskResult): def _fix_extracted_directory(directory: Path): @@ -103,7 +125,7 @@ def _fix_extracted_directory(directory: Path): base_dir = os.path.abspath(outdir) for root, dirs, files in os.walk(base_dir, topdown=True): fix_permission(Path(root)) - for name in dirs+files: + for name in dirs + files: try: full_path = os.path.join(root, name) if os.path.islink(full_path): @@ -113,9 +135,15 @@ def _fix_extracted_directory(directory: Path): if new_target != target: os.remove(full_path) os.symlink(new_target, full_path) - logger.info("Updated symlink", path=full_path, target=new_target) + logger.info( + "Updated symlink", path=full_path, target=new_target + ) else: - logger.debug("Symlink is already sanitized", path=full_path, target=new_target) + logger.debug( + "Symlink is already sanitized", + path=full_path, + target=new_target, + ) except OSError as e: if e.errno == errno.ENAMETOOLONG: continue