diff --git a/unblob/extractor.py b/unblob/extractor.py index 8b4aac0d29..f2ead067e3 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -47,68 +47,73 @@ def is_recursive_link(path: Path) -> bool: return False -def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: - """Rewrites absolute symlinks to point within the extraction directory (outdir). - - If it's not a relative symlink it is either removed it it attempts - to traverse outside of the extraction directory or rewritten to be - fully portable (no mention of the extraction directory in the link - value). - """ - if is_recursive_link(path): - logger.error("Symlink loop identified, removing", path=path) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=os.readlink(path) - ) - task_result.add_report(error_report) - path.unlink() - return path - - raw_target = os.readlink(path) - if not raw_target: - logger.error("Symlink with empty target, removing.") - path.unlink() - return path - - target = Path(raw_target) - - if target.is_absolute(): - target = Path(target.as_posix().lstrip("/")) +def sanitize_symlink_target(base_dir, current_dir, target): + # Normalize all paths to their absolute forms + base_dir_abs = os.path.abspath(base_dir) + current_dir_abs = os.path.abspath(current_dir) + target_abs = os.path.abspath(os.path.join(current_dir, target)) \ + if not os.path.isabs(target) else os.path.abspath(target) + + # Check if the target is absolute and within the base_dir + if os.path.isabs(target): + if target_abs.startswith(base_dir_abs): + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is absolute but outside base_dir - we'll pretend base_dir is our root + # and adjust the target to be within base_dir + abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs])) + # We want to return the relative path from current_dir to the adjusted target + return os.path.relpath(abs, current_dir_abs) else: - target = path.resolve() - - safe = is_safe_path(outdir, target) - - if not safe: - logger.error("Path traversal attempt through symlink, removing", target=target) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=target.as_posix() - ) - task_result.add_report(error_report) - path.unlink() - else: - relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent) - path.unlink() - path.symlink_to(relative_target) - return path - + # Target is relative + if target_abs.startswith(base_dir_abs): + # Target is relative and does not escape base_dir + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is relative and escapes base_dir + # Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive + # from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the + # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd + # without escaping /host/test_archive + + for drop_count in range(0, len(target.split('/'))): + # We drop '..'s from the target by prepending placeholder directories until we get something valid + abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target + resolved = os.path.abspath(abs) + if resolved.startswith(base_dir_abs): + break + else: + raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}") + + # We need to add the /placeholder to the relative path because we need + # to act like a file within base_dir is our root (as opposed to base_dir itself) + return os.path.relpath(resolved, base_dir_abs + '/placeholder') def fix_extracted_directory(outdir: Path, task_result: TaskResult): def _fix_extracted_directory(directory: Path): if not directory.exists(): return - for path in (directory / p for p in os.listdir(directory)): - try: - fix_permission(path) - if path.is_symlink(): - fix_symlink(path, outdir, task_result) - continue - if path.is_dir(): - _fix_extracted_directory(path) - except OSError as e: - if e.errno == errno.ENAMETOOLONG: - continue - raise e from None + + base_dir = os.path.abspath(outdir) + for root, dirs, files in os.walk(base_dir, topdown=True): + fix_permission(Path(root)) + for name in dirs+files: + try: + full_path = os.path.join(root, name) + if os.path.islink(full_path): + # Make symlinks relative and constrain them to the base_dir + target = os.readlink(full_path) + new_target = sanitize_symlink_target(base_dir, root, target) + if new_target != target: + os.remove(full_path) + os.symlink(new_target, full_path) + logger.info("Updated symlink", path=full_path, target=new_target) + else: + logger.debug("Symlink is already sanitized", path=full_path, target=new_target) + except OSError as e: + if e.errno == errno.ENAMETOOLONG: + continue + raise e from None fix_permission(outdir) _fix_extracted_directory(outdir)