Skip to content

Commit

Permalink
refactor(hdr): use FileSystem to report problems during extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
e3krisztian committed Aug 10, 2023
1 parent e0105c4 commit e779994
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions unblob/handlers/archive/xiaomi/hdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file, is_safe_path
from unblob.file_utils import File, InvalidInputFormat, iterate_file, snull
from unblob.file_utils import (
File,
FileSystem,
InvalidInputFormat,
iterate_file,
snull,
)
from unblob.models import (
Chunk,
Endian,
Extractor,
ExtractResult,
HexString,
StructHandler,
StructParser,
Expand Down Expand Up @@ -95,16 +100,13 @@ def __init__(self, header_struct: str):
self._struct_parser = StructParser(C_DEFINITIONS)

def extract(self, inpath: Path, outdir: Path):
fs = FileSystem(outdir)
with File.from_path(inpath) as file:
for output_path, chunk in self.parse(file):
if not is_safe_path(outdir, output_path):
logger.warning(
"Path traversal attempt, discarding.", output_path=output_path
)
return
carve_chunk_to_file(outdir.joinpath(output_path), file, chunk)

def parse(self, file: File) -> Iterable[Tuple[Path, Chunk]]:
for output_path, start_offset, size in self.parse(file):
fs.carve(output_path, file, start_offset, size)
return ExtractResult(reports=list(fs.problems))

def parse(self, file: File) -> Iterable[Tuple[Path, int, int]]:
header = self._struct_parser.parse(self.header_struct, file, Endian.LITTLE)
for offset in cast(Iterable, header.blob_offsets):
if not offset:
Expand All @@ -118,14 +120,12 @@ def parse(self, file: File) -> Iterable[Tuple[Path, Chunk]]:
if not is_valid_blob_header(blob_header):
raise InvalidInputFormat("Invalid HDR blob header.")

# file.tell() points to right after the blob_header
yield (
(
Path(snull(blob_header.name).decode("utf-8")),
Chunk(
start_offset=file.tell(),
end_offset=file.tell() + blob_header.size,
),
# file.tell() points to right after the blob_header == start_offset
file.tell(),
blob_header.size,
)
)

Expand Down

0 comments on commit e779994

Please sign in to comment.