From 8146b8181b95c29171df00aa6f5d21b8174fa882 Mon Sep 17 00:00:00 2001 From: Alexander Bilz Date: Sun, 7 Jul 2024 13:35:25 +0200 Subject: [PATCH] fix: parsing error origin_file attribute (#90) * fix: parsing error origin_file attribute * update ccl_chromium_reader * fix: filtering option filter_db_results * chore: bump version --- CITATION.cff | 2 +- src/forensicsim/__init__.py | 2 +- src/forensicsim/backend.py | 103 +++--------------------------------- src/forensicsim/parser.py | 4 +- tools/dump_leveldb.py | 2 +- tools/main.py | 2 +- 6 files changed, 13 insertions(+), 102 deletions(-) diff --git a/CITATION.cff b/CITATION.cff index a216e50..4d99682 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -11,6 +11,6 @@ keywords: - Forensics - Electron abstract: "Autopsy Plugin for the Digital Forensic Acquisition and Analysis of Artefacts Generated by Microsoft Teams." -version: 0.8.1 +version: 0.8.5 license: MIT date-released: "2021-08-07" diff --git a/src/forensicsim/__init__.py b/src/forensicsim/__init__.py index 8088f75..af46754 100644 --- a/src/forensicsim/__init__.py +++ b/src/forensicsim/__init__.py @@ -1 +1 @@ -__version__ = "0.8.1" +__version__ = "0.8.5" diff --git a/src/forensicsim/backend.py b/src/forensicsim/backend.py index a245020..caf07a7 100644 --- a/src/forensicsim/backend.py +++ b/src/forensicsim/backend.py @@ -22,11 +22,7 @@ SOFTWARE. """ -# mypy: disable-error-code="no-untyped-def" - import json -import typing -from collections.abc import Iterator from pathlib import Path from typing import Any, Optional @@ -40,99 +36,11 @@ ENCODING = "iso-8859-1" -""" -The following code is heavily adopted from the RawLevelDb and IndexedDB processing proposed by CCL Group - -https://github.com/cclgroupltd/ccl_chrome_indexeddb/blob/35b6a9efba1078cf339f9e64d2796b1f5f7c556f/ccl_chromium_indexeddb.py - -It uses an optimized enumeration approach for processing the metadata, which makes the original IndexedDB super slow. - -Additionally, it has a flag to filter for datastores, which are interesting for us. -""" - - -def custom_iterate_records( - self, - db_id: int, - store_id: int, - *, - live_only: Optional[bool] = False, - bad_deserializer_data_handler: Optional[ - typing.Callable[[ccl_chromium_indexeddb.IdbKey, bytes], typing.Any] - ] = None, -) -> Iterator[ccl_chromium_indexeddb.IndexedDbRecord]: - blink_deserializer = ( - ccl_chromium_indexeddb.ccl_blink_value_deserializer.BlinkV8Deserializer() - ) - # goodness me this is a slow way of doing things - prefix = ccl_chromium_indexeddb.IndexedDb.make_prefix(db_id, store_id, 1) - - for record in self._fetched_records: - if record.key.startswith(prefix): - key = ccl_chromium_indexeddb.IdbKey(record.key[len(prefix) :]) - if not record.value: - # empty values will obviously fail, returning None is probably better than dying. - yield ccl_chromium_indexeddb.IndexedDbRecord( - self, - db_id, - store_id, - key, - None, - record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live, - record.seq, - ) - continue - _value_version, varint_raw = ccl_chromium_indexeddb._le_varint_from_bytes( - record.value - ) - val_idx = len(varint_raw) - # read the blink envelope - precursor = self.read_record_precursor( - key, - db_id, - store_id, - record.value[val_idx:], - bad_deserializer_data_handler, - ) - if precursor is None: - continue # only returns None on error, handled in the function if bad_deserializer_data_handler can - - _blink_version, obj_raw, _trailer, _external_path = precursor - - try: - deserializer = ( - ccl_chromium_indexeddb.ccl_v8_value_deserializer.Deserializer( - obj_raw, host_object_delegate=blink_deserializer.read - ) - ) - value = deserializer.read() - except Exception: - if bad_deserializer_data_handler is not None: - bad_deserializer_data_handler(key, record.value) - continue - raise - - # PATCH record.origin_file to external value path - yield ccl_chromium_indexeddb.IndexedDbRecord( - self, - db_id, - store_id, - key, - value, - record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live, - record.seq, - record.origin_file, - ) - - -# Overwrite the iterate records method -ccl_chromium_indexeddb.IndexedDb.iterate_records = custom_iterate_records - def parse_db( filepath: Path, blobpath: Optional[Path] = None, - do_not_filter: Optional[bool] = False, + filter_db_results: Optional[bool] = True, ) -> list[dict[str, Any]]: # Open raw access to a LevelDB and deserialize the records. @@ -151,12 +59,15 @@ def parse_db( # Skip empty object stores if obj_store_name is None: continue - if obj_store_name in TEAMS_DB_OBJECT_STORES or do_not_filter is False: + if obj_store_name in TEAMS_DB_OBJECT_STORES or filter_db_results is False: obj_store = db[obj_store_name] records_per_object_store = 0 for record in obj_store.iterate_records(): # skip empty records - if record.external_value_path is None or record.value is None: + if not hasattr(record, "value") or record.value is None: + continue + # skip records without file origin + if not hasattr(record, "origin_file") or record.origin_file is None: continue records_per_object_store += 1 # TODO: Fix None values @@ -165,7 +76,7 @@ def parse_db( extracted_values.append({ "key": record.key.raw_key, "value": record.value, - "origin_file": record.external_value_path, + "origin_file": record.origin_file, "store": obj_store_name, "state": state, "seq": seq, diff --git a/src/forensicsim/parser.py b/src/forensicsim/parser.py index f7fadad..14ea214 100644 --- a/src/forensicsim/parser.py +++ b/src/forensicsim/parser.py @@ -369,7 +369,7 @@ def process_db( input_path: Path, output_path: Path, blob_path: Optional[Path] = None, - do_not_filter: Optional[bool] = True, + filter_db_results: Optional[bool] = True, ) -> None: if not input_path.parts[-1].endswith(".leveldb"): raise ValueError(f"Expected a leveldb folder. Path: {input_path}") @@ -377,6 +377,6 @@ def process_db( if blob_path is not None and not blob_path.parts[-1].endswith(".blob"): raise ValueError(f"Expected a .blob folder. Path: {blob_path}") - extracted_values = parse_db(input_path, blob_path, do_not_filter) + extracted_values = parse_db(input_path, blob_path, filter_db_results) parsed_records = parse_records(extracted_values) write_results_to_json(parsed_records, output_path) diff --git a/tools/dump_leveldb.py b/tools/dump_leveldb.py index 812db9b..3eedc89 100644 --- a/tools/dump_leveldb.py +++ b/tools/dump_leveldb.py @@ -36,7 +36,7 @@ def process_level_db( input_path: Path, output_path: Path, blob_path: Optional[Path] = None ) -> None: # convert the database to a python list with nested dictionaries - extracted_values = parse_db(input_path, blob_path, do_not_filter=False) + extracted_values = parse_db(input_path, blob_path, filter_db_results=False) # write the output to a json file write_results_to_json(extracted_values, output_path) diff --git a/tools/main.py b/tools/main.py index db4ec0d..534bb92 100644 --- a/tools/main.py +++ b/tools/main.py @@ -58,7 +58,7 @@ ) def process_cmd(filepath: Path, outputpath: Path, blobpath: Path) -> None: click.echo(XTRACT_HEADER) - process_db(filepath, outputpath, blobpath, True) + process_db(filepath, outputpath, blobpath, filter_db_results=True) if __name__ == "__main__":