Skip to content

Commit

Permalink
feat(reporting): report meta-data information about chunks.
Browse files Browse the repository at this point in the history
Allow handlers to provide a dict value as part of a ValidChunk metadata
attribute. That dictionnary can contain any relevant metadata
information from the perspective of the handler, but we advise handler
writers to report parsed information such as header values.

This metadata dict is later reported as part of our ChunkReports and
available in the JSON report file if the user requested one.

The idea is to expose metadata to further analysis steps through the
unblob report. For example, a binary analysis toolkit would read the load
address and architecture from a uImage chunk to analyze the file
extracted from that chunk with the right settings.

A note on the 'as_dict' implementation.

The initial idea was to implement it in dissect.cstruct (see
fox-it/dissect.cstruct#29), but due to expected
changes in the project's API I chose to implement it in unblob so we're
not dependent on another project.
  • Loading branch information
qkaiser committed Dec 18, 2024
1 parent 46cd4ce commit f6bad66
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 59 deletions.
124 changes: 71 additions & 53 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
import pytest

from unblob.file_utils import InvalidInputFormat
from unblob.models import Chunk, ProcessResult, Task, TaskResult, UnknownChunk, to_json
from unblob.models import (
Chunk,
ProcessResult,
Task,
TaskResult,
UnknownChunk,
ValidChunk,
to_json,
)
from unblob.report import (
ChunkReport,
ExtractCommandFailedReport,
Expand Down Expand Up @@ -170,56 +178,57 @@ def test_process_result_conversion(self):
decoded_report = json.loads(json_text)
assert decoded_report == [
{
"__typename__": "TaskResult",
"task": {
"path": "/nonexistent",
"depth": 0,
"blob_id": "",
"is_multi_file": False,
"__typename__": "Task",
},
"reports": [
{
"__typename__": "StatReport",
"path": "/nonexistent",
"size": 384,
"is_dir": False,
"is_file": True,
"is_link": False,
"link_target": None,
"path": "/nonexistent",
"size": 384,
"__typename__": "StatReport",
},
{
"__typename__": "FileMagicReport",
"magic": "Zip archive data, at least v2.0 to extract",
"mime_type": "application/zip",
"__typename__": "FileMagicReport",
},
{
"__typename__": "HashReport",
"md5": "9019fcece2433ad7f12c077e84537a74",
"sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166",
"sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118",
"__typename__": "HashReport",
},
{
"__typename__": "ChunkReport",
"end_offset": 384,
"extraction_reports": [],
"handler_name": "zip",
"id": "test_basic_conversion:id",
"is_encrypted": False,
"size": 384,
"handler_name": "zip",
"start_offset": 0,
"end_offset": 384,
"size": 384,
"is_encrypted": False,
"metadata": {},
"extraction_reports": [],
"__typename__": "ChunkReport",
},
],
"subtasks": [
{
"__typename__": "Task",
"blob_id": "test_basic_conversion:id",
"path": "/extractions/nonexistent_extract",
"depth": 314,
"blob_id": "test_basic_conversion:id",
"is_multi_file": False,
"path": "/extractions/nonexistent_extract",
"__typename__": "Task",
}
],
"task": {
"__typename__": "Task",
"blob_id": "",
"depth": 0,
"is_multi_file": False,
"path": "/nonexistent",
},
},
"__typename__": "TaskResult",
}
]

def test_exotic_command_output(self):
Expand All @@ -235,35 +244,44 @@ def test_exotic_command_output(self):
decoded_report = json.loads(json_text)

assert decoded_report == {
"__typename__": "ExtractCommandFailedReport",
"command": "dump all bytes",
"exit_code": 1,
"severity": "WARNING",
"command": "dump all bytes",
"stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08"
"\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16"
"\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+,"
"-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]"
"^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81"
"\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89"
"\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91"
"\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99"
"\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1"
"\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9"
"\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1"
"\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9"
"\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1"
"\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9"
"\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1"
"\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9"
"\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1"
"\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9"
"\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1"
"\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9"
"\udcfa\udcfb\udcfc\udcfd\udcfe\udcff",
"stderr": "stdout is pretty strange ;)",
"stdout": (
"b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07"
"\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f"
"\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17"
'\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#'
"$%&\\'()*+,-./0123456789:;<=>?@AB"
"CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a"
"bcdefghijklmnopqrstuvwxyz{|}~\\x7f"
"\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87"
"\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f"
"\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97"
"\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f"
"\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7"
"\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf"
"\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7"
"\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf"
"\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7"
"\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf"
"\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7"
"\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf"
"\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7"
"\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef"
"\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7"
"\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff"
"'"
),
"exit_code": 1,
"__typename__": "ExtractCommandFailedReport",
}

@pytest.mark.parametrize(
"metadata",
[
pytest.param(1, id="metadata_int"),
pytest.param(0.2, id="metadata_float"),
pytest.param(True, id="metadata_bool"),
pytest.param([1, 2], id="metadata_list"),
pytest.param((1, 2), id="metadata_tuple"),
],
)
def test_invalid_metadata(self, metadata):
with pytest.raises(ValueError, match="Can only convert dict or Instance"):
ValidChunk(start_offset=0, end_offset=100, metadata=metadata)
173 changes: 173 additions & 0 deletions tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from unblob.report import (
CarveDirectoryReport,
ChunkReport,
ExtractCommandFailedReport,
FileMagicReport,
HashReport,
StatReport,
Expand Down Expand Up @@ -48,6 +49,178 @@ def test_process_file_report_output_is_valid_json(
assert len(report)


class Test_ProcessResult_to_json: # noqa: N801
def test_simple_conversion(self):
task = Task(path=Path("/nonexistent"), depth=0, blob_id="")
task_result = TaskResult(task)
chunk_id = "test_basic_conversion:id"

task_result.add_report(
StatReport(
path=task.path,
size=384,
is_dir=False,
is_file=True,
is_link=False,
link_target=None,
)
)
task_result.add_report(
FileMagicReport(
magic="Zip archive data, at least v2.0 to extract",
mime_type="application/zip",
)
)
task_result.add_report(
HashReport(
md5="9019fcece2433ad7f12c077e84537a74",
sha1="36998218d8f43b69ef3adcadf2e8979e81eed166",
sha256="7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118",
)
)
task_result.add_report(
ChunkReport(
id=chunk_id,
handler_name="zip",
start_offset=0,
end_offset=384,
size=384,
is_encrypted=False,
metadata={},
extraction_reports=[],
)
)
task_result.add_subtask(
Task(
path=Path("/extractions/nonexistent_extract"),
depth=314,
blob_id=chunk_id,
)
)

json_text = ProcessResult(results=[task_result]).to_json()

# output must be a valid json string
assert isinstance(json_text, str)

# that can be loaded back
decoded_report = json.loads(json_text)
assert decoded_report == [
{
"task": {
"path": "/nonexistent",
"depth": 0,
"blob_id": "",
"is_multi_file": False,
"__typename__": "Task",
},
"reports": [
{
"path": "/nonexistent",
"size": 384,
"is_dir": False,
"is_file": True,
"is_link": False,
"link_target": None,
"__typename__": "StatReport",
},
{
"magic": "Zip archive data, at least v2.0 to extract",
"mime_type": "application/zip",
"__typename__": "FileMagicReport",
},
{
"md5": "9019fcece2433ad7f12c077e84537a74",
"sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166",
"sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118",
"__typename__": "HashReport",
},
{
"id": "test_basic_conversion:id",
"handler_name": "zip",
"start_offset": 0,
"end_offset": 384,
"size": 384,
"is_encrypted": False,
"metadata": {},
"extraction_reports": [],
"__typename__": "ChunkReport",
},
],
"subtasks": [
{
"path": "/extractions/nonexistent_extract",
"depth": 314,
"blob_id": "test_basic_conversion:id",
"is_multi_file": False,
"__typename__": "Task",
}
],
"__typename__": "TaskResult",
}
]

def test_exotic_command_output(self):
task = Task(path=Path("/nonexistent"), depth=0, blob_id="")
task_result = TaskResult(task)
report = ExtractCommandFailedReport(
command="dump all bytes",
stdout=bytes(range(256)),
stderr=b"stdout is pretty strange ;)",
exit_code=1,
)

task_result.add_report(
ChunkReport(
id="test",
handler_name="fail",
start_offset=0,
end_offset=256,
size=256,
is_encrypted=False,
extraction_reports=[report],
)
)
json_text = ProcessResult(results=[task_result]).to_json()

decoded_report = json.loads(json_text)
assert decoded_report == [
{
"task": {
"path": "/nonexistent",
"depth": 0,
"blob_id": "",
"is_multi_file": False,
"__typename__": "Task",
},
"reports": [
{
"id": "test",
"handler_name": "fail",
"start_offset": 0,
"end_offset": 256,
"size": 256,
"is_encrypted": False,
"metadata": {},
"extraction_reports": [
{
"severity": "WARNING",
"command": "dump all bytes",
"stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff",
"stderr": "stdout is pretty strange ;)",
"exit_code": 1,
"__typename__": "ExtractCommandFailedReport",
}
],
"__typename__": "ChunkReport",
}
],
"subtasks": [],
"__typename__": "TaskResult",
}
]


@pytest.fixture
def hello_kitty(tmp_path: Path) -> Path:
"""Generate an input file with 3 unknown chunks and 2 zip files."""
Expand Down
10 changes: 9 additions & 1 deletion unblob/handlers/archive/sevenzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractors import Command
Expand Down Expand Up @@ -90,14 +91,21 @@ class SevenZipHandler(StructHandler):
HEADER_STRUCT = HEADER_STRUCT
EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")

def get_metadata(self, header: Instance) -> dict:
return {"version_maj": header.version_maj, "version_min": header.version_min}

def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
header = self.parse_header(file)

check_header_crc(header)

size = calculate_sevenzip_size(header)

return ValidChunk(start_offset=start_offset, end_offset=start_offset + size)
metadata = self.get_metadata(header)

return ValidChunk(
start_offset=start_offset, end_offset=start_offset + size, metadata=metadata
)


class MultiVolumeSevenZipHandler(DirectoryHandler):
Expand Down
Loading

0 comments on commit f6bad66

Please sign in to comment.