Skip to content

Commit

Permalink
Rewrite symlink sanitization logic
Browse files Browse the repository at this point in the history
We can rewrite symlinks to ensure they are always relative
and remain within the extraction directory.
  • Loading branch information
Andrew Fasano authored and e3krisztian committed Feb 14, 2024
1 parent 2e6a7ae commit b11fe46
Showing 1 changed file with 61 additions and 56 deletions.
117 changes: 61 additions & 56 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,68 +47,73 @@ def is_recursive_link(path: Path) -> bool:
return False


def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
"""Rewrites absolute symlinks to point within the extraction directory (outdir).
If it's not a relative symlink it is either removed it it attempts
to traverse outside of the extraction directory or rewritten to be
fully portable (no mention of the extraction directory in the link
value).
"""
if is_recursive_link(path):
logger.error("Symlink loop identified, removing", path=path)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=os.readlink(path)
)
task_result.add_report(error_report)
path.unlink()
return path

raw_target = os.readlink(path)
if not raw_target:
logger.error("Symlink with empty target, removing.")
path.unlink()
return path

target = Path(raw_target)

if target.is_absolute():
target = Path(target.as_posix().lstrip("/"))
def sanitize_symlink_target(base_dir, current_dir, target):
# Normalize all paths to their absolute forms
base_dir_abs = os.path.abspath(base_dir)
current_dir_abs = os.path.abspath(current_dir)
target_abs = os.path.abspath(os.path.join(current_dir, target)) \
if not os.path.isabs(target) else os.path.abspath(target)

# Check if the target is absolute and within the base_dir
if os.path.isabs(target):
if target_abs.startswith(base_dir_abs):
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is absolute but outside base_dir - we'll pretend base_dir is our root
# and adjust the target to be within base_dir
abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs]))
# We want to return the relative path from current_dir to the adjusted target
return os.path.relpath(abs, current_dir_abs)
else:
target = path.resolve()

safe = is_safe_path(outdir, target)

if not safe:
logger.error("Path traversal attempt through symlink, removing", target=target)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=target.as_posix()
)
task_result.add_report(error_report)
path.unlink()
else:
relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
path.unlink()
path.symlink_to(relative_target)
return path

# Target is relative
if target_abs.startswith(base_dir_abs):
# Target is relative and does not escape base_dir
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is relative and escapes base_dir
# Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive
# from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the
# relative path from /host/test_archive/foo to /host/test_archive/etc/passwd
# without escaping /host/test_archive

for drop_count in range(0, len(target.split('/'))):
# We drop '..'s from the target by prepending placeholder directories until we get something valid
abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target
resolved = os.path.abspath(abs)
if resolved.startswith(base_dir_abs):
break
else:
raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}")

# We need to add the /placeholder to the relative path because we need
# to act like a file within base_dir is our root (as opposed to base_dir itself)
return os.path.relpath(resolved, base_dir_abs + '/placeholder')

def fix_extracted_directory(outdir: Path, task_result: TaskResult):
def _fix_extracted_directory(directory: Path):
if not directory.exists():
return
for path in (directory / p for p in os.listdir(directory)):
try:
fix_permission(path)
if path.is_symlink():
fix_symlink(path, outdir, task_result)
continue
if path.is_dir():
_fix_extracted_directory(path)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

base_dir = os.path.abspath(outdir)
for root, dirs, files in os.walk(base_dir, topdown=True):
fix_permission(Path(root))
for name in dirs+files:
try:
full_path = os.path.join(root, name)
if os.path.islink(full_path):
# Make symlinks relative and constrain them to the base_dir
target = os.readlink(full_path)
new_target = sanitize_symlink_target(base_dir, root, target)
if new_target != target:
os.remove(full_path)
os.symlink(new_target, full_path)
logger.info("Updated symlink", path=full_path, target=new_target)
else:
logger.debug("Symlink is already sanitized", path=full_path, target=new_target)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

fix_permission(outdir)
_fix_extracted_directory(outdir)
Expand Down

0 comments on commit b11fe46

Please sign in to comment.