From e83f9a655529760ae08deae9c87c8e7c2c3cc380 Mon Sep 17 00:00:00 2001 From: Krisztian Fekete <1246751+e3krisztian@users.noreply.github.com> Date: Fri, 28 Jul 2023 16:46:12 +0200 Subject: [PATCH] yaffs: use Sandbox to report problems during extraction --- unblob/handlers/filesystem/yaffs.py | 75 +++++++---------------------- 1 file changed, 18 insertions(+), 57 deletions(-) diff --git a/unblob/handlers/filesystem/yaffs.py b/unblob/handlers/filesystem/yaffs.py index 175d341b53..6f095e89be 100644 --- a/unblob/handlers/filesystem/yaffs.py +++ b/unblob/handlers/filesystem/yaffs.py @@ -1,6 +1,5 @@ import io import itertools -import os from collections import defaultdict from enum import IntEnum from pathlib import Path @@ -12,17 +11,17 @@ from treelib import Tree from treelib.exceptions import NodeIDAbsentError -from unblob.extractor import is_safe_path from unblob.file_utils import ( Endian, File, InvalidInputFormat, + Sandbox, StructParser, get_endian_multi, read_until_past, snull, ) -from unblob.models import Extractor, Handler, HexString, ValidChunk +from unblob.models import Extractor, ExtractResult, Handler, HexString, ValidChunk logger = get_logger() @@ -470,84 +469,44 @@ def resolve_path(self, entry: YAFFSEntry) -> Path: return self.resolve_path(parent_entry).joinpath(resolved_path) return resolved_path - def get_file_bytes(self, entry: YAFFSEntry) -> Iterable[bytes]: + def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: for chunk in self.get_chunks(entry.object_id): yield self.file[chunk.offset : chunk.offset + chunk.byte_count] - def extract(self, outdir: Path): + def extract(self, fs: Sandbox): for entry in [ self.file_entries.get_node(node) for node in self.file_entries.expand_tree(mode=Tree.DEPTH) ]: if entry is None or entry.data is None: continue - self.extract_entry(entry.data, outdir) + self.extract_entry(entry.data, fs) - def extract_entry(self, entry: YAFFSEntry, outdir: Path): # noqa: C901 + def extract_entry(self, entry: YAFFSEntry, fs: Sandbox): if entry.object_type == YaffsObjectType.UNKNOWN: - logger.warning("unknown type entry", entry=entry) + logger.warning("unknown entry type", entry=entry) return entry_path = self.resolve_path(entry) - - if not is_safe_path(outdir, entry_path): - logger.warning( - "Potential path traversal attempt", outdir=outdir, path=entry_path - ) - return - - out_path = outdir.joinpath(entry_path) + out_path = fs.root / entry_path if entry.object_type == YaffsObjectType.SPECIAL: if not isinstance(entry, YAFFS2Entry): logger.warning("non YAFFS2 special object", entry=entry) return - if os.geteuid() == 0: - logger.debug( - "creating special file", special_path=out_path, _verbosity=3 - ) - os.mknod(out_path.as_posix(), entry.st_mode, entry.st_rdev) - else: - logger.warning( - "creating special files requires elevated privileges, skipping.", - path=out_path, - st_mode=entry.st_mode, - st_rdev=entry.st_rdev, - ) - return - - if entry.object_type == YaffsObjectType.DIRECTORY: - logger.debug("creating directory", dir_path=out_path, _verbosity=3) - out_path.mkdir(exist_ok=True) + fs.mknod(out_path, entry.st_mode, entry.st_rdev) + elif entry.object_type == YaffsObjectType.DIRECTORY: + fs.mkdir(out_path, exist_ok=True) elif entry.object_type == YaffsObjectType.FILE: - logger.debug("creating file", file_path=out_path, _verbosity=3) - with out_path.open("wb") as f: - for chunk in self.get_file_bytes(entry): - f.write(chunk) + fs.write_chunks(out_path, self.get_file_chunks(entry)) elif entry.object_type == YaffsObjectType.SYMLINK: - if not is_safe_path(outdir, out_path.parent / Path(entry.alias)): - logger.warning( - "Potential path traversal attempt through symlink", - outdir=outdir, - path=entry.alias, - ) - return - logger.debug("creating symlink", file_path=out_path, _verbosity=3) - out_path.symlink_to(Path(entry.alias)) + fs.create_symlink(src=Path(entry.alias), dst=out_path) elif entry.object_type == YaffsObjectType.HARDLINK: - logger.debug("creating hardlink", file_path=out_path, _verbosity=3) dst_entry = self.file_entries[entry.equiv_id].data dst_path = self.resolve_path(dst_entry) - if not is_safe_path(outdir, dst_path): - logger.warning( - "Potential path traversal attempt through hardlink", - outdir=outdir, - path=dst_path, - ) - return - dst_full_path = outdir / dst_path - dst_full_path.link_to(out_path) + dst_full_path = fs.root / dst_path + fs.create_hardlink(src=dst_full_path, dst=out_path) class YAFFS2Parser(YAFFSParser): @@ -765,7 +724,9 @@ def extract(self, inpath: Path, outdir: Path): infile = File.from_path(inpath) parser = instantiate_parser(infile) parser.parse(store=True) - parser.extract(outdir) + fs = Sandbox(outdir) + parser.extract(fs) + return ExtractResult(reports=list(fs.problems)) class YAFFSHandler(Handler):