Skip to content

Commit

Permalink
fix: parsing error origin_file attribute (#90)
Browse files Browse the repository at this point in the history
* fix: parsing error origin_file attribute
* update ccl_chromium_reader
* fix: filtering option filter_db_results
* chore: bump version
  • Loading branch information
lxndrblz authored Jul 7, 2024
1 parent 0cc80d9 commit 8146b81
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 102 deletions.
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ keywords:
- Forensics
- Electron
abstract: "Autopsy Plugin for the Digital Forensic Acquisition and Analysis of Artefacts Generated by Microsoft Teams."
version: 0.8.1
version: 0.8.5
license: MIT
date-released: "2021-08-07"
2 changes: 1 addition & 1 deletion src/forensicsim/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.1"
__version__ = "0.8.5"
103 changes: 7 additions & 96 deletions src/forensicsim/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
SOFTWARE.
"""

# mypy: disable-error-code="no-untyped-def"

import json
import typing
from collections.abc import Iterator
from pathlib import Path
from typing import Any, Optional

Expand All @@ -40,99 +36,11 @@

ENCODING = "iso-8859-1"

"""
The following code is heavily adopted from the RawLevelDb and IndexedDB processing proposed by CCL Group
https://github.com/cclgroupltd/ccl_chrome_indexeddb/blob/35b6a9efba1078cf339f9e64d2796b1f5f7c556f/ccl_chromium_indexeddb.py
It uses an optimized enumeration approach for processing the metadata, which makes the original IndexedDB super slow.
Additionally, it has a flag to filter for datastores, which are interesting for us.
"""


def custom_iterate_records(
self,
db_id: int,
store_id: int,
*,
live_only: Optional[bool] = False,
bad_deserializer_data_handler: Optional[
typing.Callable[[ccl_chromium_indexeddb.IdbKey, bytes], typing.Any]
] = None,
) -> Iterator[ccl_chromium_indexeddb.IndexedDbRecord]:
blink_deserializer = (
ccl_chromium_indexeddb.ccl_blink_value_deserializer.BlinkV8Deserializer()
)
# goodness me this is a slow way of doing things
prefix = ccl_chromium_indexeddb.IndexedDb.make_prefix(db_id, store_id, 1)

for record in self._fetched_records:
if record.key.startswith(prefix):
key = ccl_chromium_indexeddb.IdbKey(record.key[len(prefix) :])
if not record.value:
# empty values will obviously fail, returning None is probably better than dying.
yield ccl_chromium_indexeddb.IndexedDbRecord(
self,
db_id,
store_id,
key,
None,
record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live,
record.seq,
)
continue
_value_version, varint_raw = ccl_chromium_indexeddb._le_varint_from_bytes(
record.value
)
val_idx = len(varint_raw)
# read the blink envelope
precursor = self.read_record_precursor(
key,
db_id,
store_id,
record.value[val_idx:],
bad_deserializer_data_handler,
)
if precursor is None:
continue # only returns None on error, handled in the function if bad_deserializer_data_handler can

_blink_version, obj_raw, _trailer, _external_path = precursor

try:
deserializer = (
ccl_chromium_indexeddb.ccl_v8_value_deserializer.Deserializer(
obj_raw, host_object_delegate=blink_deserializer.read
)
)
value = deserializer.read()
except Exception:
if bad_deserializer_data_handler is not None:
bad_deserializer_data_handler(key, record.value)
continue
raise

# PATCH record.origin_file to external value path
yield ccl_chromium_indexeddb.IndexedDbRecord(
self,
db_id,
store_id,
key,
value,
record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live,
record.seq,
record.origin_file,
)


# Overwrite the iterate records method
ccl_chromium_indexeddb.IndexedDb.iterate_records = custom_iterate_records


def parse_db(
filepath: Path,
blobpath: Optional[Path] = None,
do_not_filter: Optional[bool] = False,
filter_db_results: Optional[bool] = True,
) -> list[dict[str, Any]]:
# Open raw access to a LevelDB and deserialize the records.

Expand All @@ -151,12 +59,15 @@ def parse_db(
# Skip empty object stores
if obj_store_name is None:
continue
if obj_store_name in TEAMS_DB_OBJECT_STORES or do_not_filter is False:
if obj_store_name in TEAMS_DB_OBJECT_STORES or filter_db_results is False:
obj_store = db[obj_store_name]
records_per_object_store = 0
for record in obj_store.iterate_records():
# skip empty records
if record.external_value_path is None or record.value is None:
if not hasattr(record, "value") or record.value is None:
continue
# skip records without file origin
if not hasattr(record, "origin_file") or record.origin_file is None:
continue
records_per_object_store += 1
# TODO: Fix None values
Expand All @@ -165,7 +76,7 @@ def parse_db(
extracted_values.append({
"key": record.key.raw_key,
"value": record.value,
"origin_file": record.external_value_path,
"origin_file": record.origin_file,
"store": obj_store_name,
"state": state,
"seq": seq,
Expand Down
4 changes: 2 additions & 2 deletions src/forensicsim/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,14 @@ def process_db(
input_path: Path,
output_path: Path,
blob_path: Optional[Path] = None,
do_not_filter: Optional[bool] = True,
filter_db_results: Optional[bool] = True,
) -> None:
if not input_path.parts[-1].endswith(".leveldb"):
raise ValueError(f"Expected a leveldb folder. Path: {input_path}")

if blob_path is not None and not blob_path.parts[-1].endswith(".blob"):
raise ValueError(f"Expected a .blob folder. Path: {blob_path}")

extracted_values = parse_db(input_path, blob_path, do_not_filter)
extracted_values = parse_db(input_path, blob_path, filter_db_results)
parsed_records = parse_records(extracted_values)
write_results_to_json(parsed_records, output_path)
2 changes: 1 addition & 1 deletion tools/dump_leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def process_level_db(
input_path: Path, output_path: Path, blob_path: Optional[Path] = None
) -> None:
# convert the database to a python list with nested dictionaries
extracted_values = parse_db(input_path, blob_path, do_not_filter=False)
extracted_values = parse_db(input_path, blob_path, filter_db_results=False)

# write the output to a json file
write_results_to_json(extracted_values, output_path)
Expand Down
2 changes: 1 addition & 1 deletion tools/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
)
def process_cmd(filepath: Path, outputpath: Path, blobpath: Path) -> None:
click.echo(XTRACT_HEADER)
process_db(filepath, outputpath, blobpath, True)
process_db(filepath, outputpath, blobpath, filter_db_results=True)


if __name__ == "__main__":
Expand Down

0 comments on commit 8146b81

Please sign in to comment.