Skip to content

Commit

Permalink
yaffs: use Sandbox to report problems during extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
e3krisztian committed Aug 3, 2023
1 parent 80487f7 commit e83f9a6
Showing 1 changed file with 18 additions and 57 deletions.
75 changes: 18 additions & 57 deletions unblob/handlers/filesystem/yaffs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import itertools
import os
from collections import defaultdict
from enum import IntEnum
from pathlib import Path
Expand All @@ -12,17 +11,17 @@
from treelib import Tree
from treelib.exceptions import NodeIDAbsentError

from unblob.extractor import is_safe_path
from unblob.file_utils import (
Endian,
File,
InvalidInputFormat,
Sandbox,
StructParser,
get_endian_multi,
read_until_past,
snull,
)
from unblob.models import Extractor, Handler, HexString, ValidChunk
from unblob.models import Extractor, ExtractResult, Handler, HexString, ValidChunk

logger = get_logger()

Expand Down Expand Up @@ -470,84 +469,44 @@ def resolve_path(self, entry: YAFFSEntry) -> Path:
return self.resolve_path(parent_entry).joinpath(resolved_path)
return resolved_path

def get_file_bytes(self, entry: YAFFSEntry) -> Iterable[bytes]:
def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
for chunk in self.get_chunks(entry.object_id):
yield self.file[chunk.offset : chunk.offset + chunk.byte_count]

def extract(self, outdir: Path):
def extract(self, fs: Sandbox):
for entry in [
self.file_entries.get_node(node)
for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
]:
if entry is None or entry.data is None:
continue
self.extract_entry(entry.data, outdir)
self.extract_entry(entry.data, fs)

def extract_entry(self, entry: YAFFSEntry, outdir: Path): # noqa: C901
def extract_entry(self, entry: YAFFSEntry, fs: Sandbox):
if entry.object_type == YaffsObjectType.UNKNOWN:
logger.warning("unknown type entry", entry=entry)
logger.warning("unknown entry type", entry=entry)
return

entry_path = self.resolve_path(entry)

if not is_safe_path(outdir, entry_path):
logger.warning(
"Potential path traversal attempt", outdir=outdir, path=entry_path
)
return

out_path = outdir.joinpath(entry_path)
out_path = fs.root / entry_path

if entry.object_type == YaffsObjectType.SPECIAL:
if not isinstance(entry, YAFFS2Entry):
logger.warning("non YAFFS2 special object", entry=entry)
return

if os.geteuid() == 0:
logger.debug(
"creating special file", special_path=out_path, _verbosity=3
)
os.mknod(out_path.as_posix(), entry.st_mode, entry.st_rdev)
else:
logger.warning(
"creating special files requires elevated privileges, skipping.",
path=out_path,
st_mode=entry.st_mode,
st_rdev=entry.st_rdev,
)
return

if entry.object_type == YaffsObjectType.DIRECTORY:
logger.debug("creating directory", dir_path=out_path, _verbosity=3)
out_path.mkdir(exist_ok=True)
fs.mknod(out_path, entry.st_mode, entry.st_rdev)
elif entry.object_type == YaffsObjectType.DIRECTORY:
fs.mkdir(out_path, exist_ok=True)
elif entry.object_type == YaffsObjectType.FILE:
logger.debug("creating file", file_path=out_path, _verbosity=3)
with out_path.open("wb") as f:
for chunk in self.get_file_bytes(entry):
f.write(chunk)
fs.write_chunks(out_path, self.get_file_chunks(entry))
elif entry.object_type == YaffsObjectType.SYMLINK:
if not is_safe_path(outdir, out_path.parent / Path(entry.alias)):
logger.warning(
"Potential path traversal attempt through symlink",
outdir=outdir,
path=entry.alias,
)
return
logger.debug("creating symlink", file_path=out_path, _verbosity=3)
out_path.symlink_to(Path(entry.alias))
fs.create_symlink(src=Path(entry.alias), dst=out_path)
elif entry.object_type == YaffsObjectType.HARDLINK:
logger.debug("creating hardlink", file_path=out_path, _verbosity=3)
dst_entry = self.file_entries[entry.equiv_id].data
dst_path = self.resolve_path(dst_entry)
if not is_safe_path(outdir, dst_path):
logger.warning(
"Potential path traversal attempt through hardlink",
outdir=outdir,
path=dst_path,
)
return
dst_full_path = outdir / dst_path
dst_full_path.link_to(out_path)
dst_full_path = fs.root / dst_path
fs.create_hardlink(src=dst_full_path, dst=out_path)


class YAFFS2Parser(YAFFSParser):
Expand Down Expand Up @@ -765,7 +724,9 @@ def extract(self, inpath: Path, outdir: Path):
infile = File.from_path(inpath)
parser = instantiate_parser(infile)
parser.parse(store=True)
parser.extract(outdir)
fs = Sandbox(outdir)
parser.extract(fs)
return ExtractResult(reports=list(fs.problems))


class YAFFSHandler(Handler):
Expand Down

0 comments on commit e83f9a6

Please sign in to comment.