From f6bad66d3d122ab6d2abc781487c1a08d9b5c10c Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Fri, 14 Apr 2023 14:07:04 +0200 Subject: [PATCH] feat(reporting): report meta-data information about chunks. Allow handlers to provide a dict value as part of a ValidChunk metadata attribute. That dictionnary can contain any relevant metadata information from the perspective of the handler, but we advise handler writers to report parsed information such as header values. This metadata dict is later reported as part of our ChunkReports and available in the JSON report file if the user requested one. The idea is to expose metadata to further analysis steps through the unblob report. For example, a binary analysis toolkit would read the load address and architecture from a uImage chunk to analyze the file extracted from that chunk with the right settings. A note on the 'as_dict' implementation. The initial idea was to implement it in dissect.cstruct (see https://github.com/fox-it/dissect.cstruct/pull/29), but due to expected changes in the project's API I chose to implement it in unblob so we're not dependent on another project. --- tests/test_models.py | 124 +++++++++++--------- tests/test_report.py | 173 ++++++++++++++++++++++++++++ unblob/handlers/archive/sevenzip.py | 10 +- unblob/models.py | 30 ++++- unblob/report.py | 1 + 5 files changed, 279 insertions(+), 59 deletions(-) diff --git a/tests/test_models.py b/tests/test_models.py index c1d44220a0..d3d2b3a67c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,15 @@ import pytest from unblob.file_utils import InvalidInputFormat -from unblob.models import Chunk, ProcessResult, Task, TaskResult, UnknownChunk, to_json +from unblob.models import ( + Chunk, + ProcessResult, + Task, + TaskResult, + UnknownChunk, + ValidChunk, + to_json, +) from unblob.report import ( ChunkReport, ExtractCommandFailedReport, @@ -170,56 +178,57 @@ def test_process_result_conversion(self): decoded_report = json.loads(json_text) assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, "reports": [ { - "__typename__": "StatReport", + "path": "/nonexistent", + "size": 384, "is_dir": False, "is_file": True, "is_link": False, "link_target": None, - "path": "/nonexistent", - "size": 384, + "__typename__": "StatReport", }, { - "__typename__": "FileMagicReport", "magic": "Zip archive data, at least v2.0 to extract", "mime_type": "application/zip", + "__typename__": "FileMagicReport", }, { - "__typename__": "HashReport", "md5": "9019fcece2433ad7f12c077e84537a74", "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", }, { - "__typename__": "ChunkReport", - "end_offset": 384, - "extraction_reports": [], - "handler_name": "zip", "id": "test_basic_conversion:id", - "is_encrypted": False, - "size": 384, + "handler_name": "zip", "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", }, ], "subtasks": [ { - "__typename__": "Task", - "blob_id": "test_basic_conversion:id", + "path": "/extractions/nonexistent_extract", "depth": 314, + "blob_id": "test_basic_conversion:id", "is_multi_file": False, - "path": "/extractions/nonexistent_extract", + "__typename__": "Task", } ], - "task": { - "__typename__": "Task", - "blob_id": "", - "depth": 0, - "is_multi_file": False, - "path": "/nonexistent", - }, - }, + "__typename__": "TaskResult", + } ] def test_exotic_command_output(self): @@ -235,35 +244,44 @@ def test_exotic_command_output(self): decoded_report = json.loads(json_text) assert decoded_report == { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16" + "\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+," + "-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]" + "^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81" + "\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89" + "\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91" + "\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99" + "\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1" + "\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9" + "\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1" + "\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9" + "\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1" + "\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9" + "\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1" + "\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9" + "\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1" + "\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9" + "\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1" + "\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9" + "\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } + + @pytest.mark.parametrize( + "metadata", + [ + pytest.param(1, id="metadata_int"), + pytest.param(0.2, id="metadata_float"), + pytest.param(True, id="metadata_bool"), + pytest.param([1, 2], id="metadata_list"), + pytest.param((1, 2), id="metadata_tuple"), + ], + ) + def test_invalid_metadata(self, metadata): + with pytest.raises(ValueError, match="Can only convert dict or Instance"): + ValidChunk(start_offset=0, end_offset=100, metadata=metadata) diff --git a/tests/test_report.py b/tests/test_report.py index ecbe79ca5c..07170d0474 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -11,6 +11,7 @@ from unblob.report import ( CarveDirectoryReport, ChunkReport, + ExtractCommandFailedReport, FileMagicReport, HashReport, StatReport, @@ -48,6 +49,178 @@ def test_process_file_report_output_is_valid_json( assert len(report) +class Test_ProcessResult_to_json: # noqa: N801 + def test_simple_conversion(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + chunk_id = "test_basic_conversion:id" + + task_result.add_report( + StatReport( + path=task.path, + size=384, + is_dir=False, + is_file=True, + is_link=False, + link_target=None, + ) + ) + task_result.add_report( + FileMagicReport( + magic="Zip archive data, at least v2.0 to extract", + mime_type="application/zip", + ) + ) + task_result.add_report( + HashReport( + md5="9019fcece2433ad7f12c077e84537a74", + sha1="36998218d8f43b69ef3adcadf2e8979e81eed166", + sha256="7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + ) + ) + task_result.add_report( + ChunkReport( + id=chunk_id, + handler_name="zip", + start_offset=0, + end_offset=384, + size=384, + is_encrypted=False, + metadata={}, + extraction_reports=[], + ) + ) + task_result.add_subtask( + Task( + path=Path("/extractions/nonexistent_extract"), + depth=314, + blob_id=chunk_id, + ) + ) + + json_text = ProcessResult(results=[task_result]).to_json() + + # output must be a valid json string + assert isinstance(json_text, str) + + # that can be loaded back + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "path": "/nonexistent", + "size": 384, + "is_dir": False, + "is_file": True, + "is_link": False, + "link_target": None, + "__typename__": "StatReport", + }, + { + "magic": "Zip archive data, at least v2.0 to extract", + "mime_type": "application/zip", + "__typename__": "FileMagicReport", + }, + { + "md5": "9019fcece2433ad7f12c077e84537a74", + "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", + "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", + }, + { + "id": "test_basic_conversion:id", + "handler_name": "zip", + "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", + }, + ], + "subtasks": [ + { + "path": "/extractions/nonexistent_extract", + "depth": 314, + "blob_id": "test_basic_conversion:id", + "is_multi_file": False, + "__typename__": "Task", + } + ], + "__typename__": "TaskResult", + } + ] + + def test_exotic_command_output(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + report = ExtractCommandFailedReport( + command="dump all bytes", + stdout=bytes(range(256)), + stderr=b"stdout is pretty strange ;)", + exit_code=1, + ) + + task_result.add_report( + ChunkReport( + id="test", + handler_name="fail", + start_offset=0, + end_offset=256, + size=256, + is_encrypted=False, + extraction_reports=[report], + ) + ) + json_text = ProcessResult(results=[task_result]).to_json() + + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "id": "test", + "handler_name": "fail", + "start_offset": 0, + "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [ + { + "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", + "stderr": "stdout is pretty strange ;)", + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", + } + ], + "__typename__": "ChunkReport", + } + ], + "subtasks": [], + "__typename__": "TaskResult", + } + ] + + @pytest.fixture def hello_kitty(tmp_path: Path) -> Path: """Generate an input file with 3 unknown chunks and 2 zip files.""" diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index 5cd60a5174..bbdd7c2f35 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -22,6 +22,7 @@ from pathlib import Path from typing import Optional +from dissect.cstruct import Instance from structlog import get_logger from unblob.extractors import Command @@ -90,6 +91,9 @@ class SevenZipHandler(StructHandler): HEADER_STRUCT = HEADER_STRUCT EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") + def get_metadata(self, header: Instance) -> dict: + return {"version_maj": header.version_maj, "version_min": header.version_min} + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: header = self.parse_header(file) @@ -97,7 +101,11 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] size = calculate_sevenzip_size(header) - return ValidChunk(start_offset=start_offset, end_offset=start_offset + size) + metadata = self.get_metadata(header) + + return ValidChunk( + start_offset=start_offset, end_offset=start_offset + size, metadata=metadata + ) class MultiVolumeSevenZipHandler(DirectoryHandler): diff --git a/unblob/models.py b/unblob/models.py index b6227007e4..7045b1eb28 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -4,10 +4,11 @@ from collections.abc import Iterable from enum import Enum from pathlib import Path -from typing import Optional, TypeVar +from typing import Optional, TypeVar, Union import attr import attrs +from dissect.cstruct import Instance from structlog import get_logger from .file_utils import Endian, File, InvalidInputFormat, StructParser @@ -31,6 +32,24 @@ # +def metadata_converter(obj: Union[dict, Instance]) -> dict: + if isinstance(obj, dict): + return obj + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + raise ValueError("Can only convert dict or Instance") + + +def metadata_validator(instance, attribute, value): + if attribute.name == "metadata" and isinstance(instance, Chunk): + for k, _ in value.items(): + if not isinstance(k, str): + raise TypeError("metadata keys must be string") + + @attr.define(frozen=True) class Task: path: Path @@ -107,6 +126,9 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib( + factory=dict, converter=metadata_converter, validator=metadata_validator + ) def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: if self.is_encrypted: @@ -127,6 +149,7 @@ def as_report(self, extraction_reports: list[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -270,10 +293,7 @@ def default(self, obj): return str(obj) if isinstance(obj, bytes): - try: - return obj.decode() - except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 641b2d408b..c972794840 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -219,6 +219,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(factory=dict) extraction_reports: list[Report]