diff --git a/src/forensicsim/backend.py b/src/forensicsim/backend.py index a245020..de2351c 100644 --- a/src/forensicsim/backend.py +++ b/src/forensicsim/backend.py @@ -22,8 +22,6 @@ SOFTWARE. """ -# mypy: disable-error-code="no-untyped-def" - import json import typing from collections.abc import Iterator @@ -40,94 +38,6 @@ ENCODING = "iso-8859-1" -""" -The following code is heavily adopted from the RawLevelDb and IndexedDB processing proposed by CCL Group - -https://github.com/cclgroupltd/ccl_chrome_indexeddb/blob/35b6a9efba1078cf339f9e64d2796b1f5f7c556f/ccl_chromium_indexeddb.py - -It uses an optimized enumeration approach for processing the metadata, which makes the original IndexedDB super slow. - -Additionally, it has a flag to filter for datastores, which are interesting for us. -""" - - -def custom_iterate_records( - self, - db_id: int, - store_id: int, - *, - live_only: Optional[bool] = False, - bad_deserializer_data_handler: Optional[ - typing.Callable[[ccl_chromium_indexeddb.IdbKey, bytes], typing.Any] - ] = None, -) -> Iterator[ccl_chromium_indexeddb.IndexedDbRecord]: - blink_deserializer = ( - ccl_chromium_indexeddb.ccl_blink_value_deserializer.BlinkV8Deserializer() - ) - # goodness me this is a slow way of doing things - prefix = ccl_chromium_indexeddb.IndexedDb.make_prefix(db_id, store_id, 1) - - for record in self._fetched_records: - if record.key.startswith(prefix): - key = ccl_chromium_indexeddb.IdbKey(record.key[len(prefix) :]) - if not record.value: - # empty values will obviously fail, returning None is probably better than dying. - yield ccl_chromium_indexeddb.IndexedDbRecord( - self, - db_id, - store_id, - key, - None, - record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live, - record.seq, - ) - continue - _value_version, varint_raw = ccl_chromium_indexeddb._le_varint_from_bytes( - record.value - ) - val_idx = len(varint_raw) - # read the blink envelope - precursor = self.read_record_precursor( - key, - db_id, - store_id, - record.value[val_idx:], - bad_deserializer_data_handler, - ) - if precursor is None: - continue # only returns None on error, handled in the function if bad_deserializer_data_handler can - - _blink_version, obj_raw, _trailer, _external_path = precursor - - try: - deserializer = ( - ccl_chromium_indexeddb.ccl_v8_value_deserializer.Deserializer( - obj_raw, host_object_delegate=blink_deserializer.read - ) - ) - value = deserializer.read() - except Exception: - if bad_deserializer_data_handler is not None: - bad_deserializer_data_handler(key, record.value) - continue - raise - - # PATCH record.origin_file to external value path - yield ccl_chromium_indexeddb.IndexedDbRecord( - self, - db_id, - store_id, - key, - value, - record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live, - record.seq, - record.origin_file, - ) - - -# Overwrite the iterate records method -ccl_chromium_indexeddb.IndexedDb.iterate_records = custom_iterate_records - def parse_db( filepath: Path, @@ -156,7 +66,10 @@ def parse_db( records_per_object_store = 0 for record in obj_store.iterate_records(): # skip empty records - if record.external_value_path is None or record.value is None: + if not hasattr(record, 'value') or record.value is None: + continue + # skip records without file origin + if not hasattr(record, 'origin_file') or record.origin_file is None: continue records_per_object_store += 1 # TODO: Fix None values @@ -165,7 +78,7 @@ def parse_db( extracted_values.append({ "key": record.key.raw_key, "value": record.value, - "origin_file": record.external_value_path, + "origin_file": record.origin_file, "store": obj_store_name, "state": state, "seq": seq,