Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitize symlink target #768

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 91 additions & 52 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""File extraction related functions."""
# ruff: noqa

qkaiser marked this conversation as resolved.
Show resolved Hide resolved
import errno
import os
from pathlib import Path
Expand All @@ -15,6 +17,14 @@
FILE_PERMISSION_MASK = 0o644
DIR_PERMISSION_MASK = 0o775

# re-exported, or unused symbols
__all__ = [
"is_safe_path",
"MaliciousSymlinkRemoved",
"fix_symlink",
"is_recursive_link",
]

qkaiser marked this conversation as resolved.
Show resolved Hide resolved

def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk):
"""Extract valid chunk to a file, which we then pass to another tool to extract it."""
Expand Down Expand Up @@ -48,67 +58,96 @@ def is_recursive_link(path: Path) -> bool:


def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
"""Rewrites absolute symlinks to point within the extraction directory (outdir).

If it's not a relative symlink it is either removed it it attempts
to traverse outside of the extraction directory or rewritten to be
fully portable (no mention of the extraction directory in the link
value).
"""
if is_recursive_link(path):
logger.error("Symlink loop identified, removing", path=path)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=os.readlink(path)
)
task_result.add_report(error_report)
path.unlink()
return path

raw_target = os.readlink(path)
if not raw_target:
logger.error("Symlink with empty target, removing.")
path.unlink()
return path

target = Path(raw_target)

if target.is_absolute():
target = Path(target.as_posix().lstrip("/"))
else:
target = path.resolve()
# This is a temporary function for existing unit tests in tests/test_extractor.py
fix_extracted_directory(outdir, task_result)
return path

safe = is_safe_path(outdir, target)

if not safe:
logger.error("Path traversal attempt through symlink, removing", target=target)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=target.as_posix()
)
task_result.add_report(error_report)
path.unlink()
def sanitize_symlink_target(base_dir, current_dir, target):
# Normalize all paths to their absolute forms
base_dir_abs = os.path.abspath(base_dir)
current_dir_abs = os.path.abspath(current_dir)
target_abs = (
os.path.abspath(os.path.join(current_dir, target))
if not os.path.isabs(target)
else os.path.abspath(target)
)

# Check if the target is absolute and within the base_dir
if os.path.isabs(target):
if target_abs.startswith(base_dir_abs):
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is absolute but outside base_dir - we'll pretend base_dir is our root
# and adjust the target to be within base_dir
abs = (
base_dir
+ "/"
+ os.path.relpath(
target_abs, os.path.commonpath([target_abs, base_dir_abs])
)
)
# We want to return the relative path from current_dir to the adjusted target
return os.path.relpath(abs, current_dir_abs)
else:
relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
path.unlink()
path.symlink_to(relative_target)
return path
# Target is relative
if target_abs.startswith(base_dir_abs):
# Target is relative and does not escape base_dir
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is relative and escapes base_dir
# Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive
# from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the
# relative path from /host/test_archive/foo to /host/test_archive/etc/passwd
# without escaping /host/test_archive

for drop_count in range(len(target.split("/"))):
# We drop '..'s from the target by prepending placeholder directories until we get something valid
abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target
resolved = os.path.abspath(abs)
if resolved.startswith(base_dir_abs):
break
else:
raise ValueError(
f"Could not resolve symlink target {target} within base_dir {base_dir}"
)

# We need to add the /placeholder to the relative path because we need
# to act like a file within base_dir is our root (as opposed to base_dir itself)
return os.path.relpath(resolved, base_dir_abs + "/placeholder")


def fix_extracted_directory(outdir: Path, task_result: TaskResult):
def _fix_extracted_directory(directory: Path):
if not directory.exists():
return
for path in (directory / p for p in os.listdir(directory)):
try:
fix_permission(path)
if path.is_symlink():
fix_symlink(path, outdir, task_result)
continue
if path.is_dir():
_fix_extracted_directory(path)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

base_dir = os.path.abspath(outdir)
for root, dirs, files in os.walk(base_dir, topdown=True):
fix_permission(Path(root))
for name in dirs + files:
try:
full_path = os.path.join(root, name)
if os.path.islink(full_path):
# Make symlinks relative and constrain them to the base_dir
target = os.readlink(full_path)
new_target = sanitize_symlink_target(base_dir, root, target)
if new_target != target:
os.remove(full_path)
os.symlink(new_target, full_path)
logger.info(
"Updated symlink", path=full_path, target=new_target
)
else:
logger.debug(
"Symlink is already sanitized",
path=full_path,
target=new_target,
)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

fix_permission(outdir)
_fix_extracted_directory(outdir)
Expand Down
Loading