diff --git a/tests/handlers/filesystem/test_jffs2.py b/tests/handlers/filesystem/test_jffs2.py index 64c913a99a..9b2f910e69 100644 --- a/tests/handlers/filesystem/test_jffs2.py +++ b/tests/handlers/filesystem/test_jffs2.py @@ -1,7 +1,6 @@ import binascii import pytest -from dissect.cstruct import Instance from helpers import unhex from unblob.file_utils import Endian, File @@ -59,7 +58,7 @@ def get_valid_jffs2_old_be_header(): ) -def calculate_crc(header: Instance): +def calculate_crc(header): return (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF @@ -188,9 +187,7 @@ def calculate_crc(header: Instance): ), ], ) -def test_valid_header_new( - header: Instance, node_start_offset: int, eof: int, expected: bool -): +def test_valid_header_new(header, node_start_offset: int, eof: int, expected: bool): header.hdr_crc = calculate_crc(header) assert new_handler.valid_header(header, node_start_offset, eof) == expected @@ -270,9 +267,7 @@ def test_valid_header_new( ), ], ) -def test_valid_header_old( - header: Instance, node_start_offset: int, eof: int, expected: bool -): +def test_valid_header_old(header, node_start_offset: int, eof: int, expected: bool): header.hdr_crc = calculate_crc(header) assert old_handler.valid_header(header, node_start_offset, eof) == expected diff --git a/unblob/file_utils.py b/unblob/file_utils.py index e532be6ab5..e476b552ba 100644 --- a/unblob/file_utils.py +++ b/unblob/file_utils.py @@ -12,7 +12,7 @@ from pathlib import Path from typing import Iterable, Iterator, List, Literal, Optional, Tuple, Union -from dissect.cstruct import Instance, cstruct +from dissect.cstruct import cstruct from structlog import get_logger from .logging import format_hex @@ -336,7 +336,7 @@ def parse( struct_name: str, file: Union[File, bytes], endian: Endian, - ) -> Instance: + ): cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be struct_parser = getattr(cparser, struct_name) return struct_parser(file) diff --git a/unblob/handlers/archive/arc.py b/unblob/handlers/archive/arc.py index 86c3a076d9..5d5545eb16 100644 --- a/unblob/handlers/archive/arc.py +++ b/unblob/handlers/archive/arc.py @@ -1,6 +1,5 @@ from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractors.command import Command @@ -54,7 +53,7 @@ def valid_name(self, name: bytes) -> bool: except UnicodeDecodeError: return False - def valid_header(self, header: Instance) -> bool: + def valid_header(self, header) -> bool: if header.archive_marker != 0x1A: return False if header.header_type > 0x07: diff --git a/unblob/handlers/archive/cpio.py b/unblob/handlers/archive/cpio.py index 52c743eb32..adc997fc37 100644 --- a/unblob/handlers/archive/cpio.py +++ b/unblob/handlers/archive/cpio.py @@ -5,7 +5,6 @@ from typing import Optional, Type import attr -from dissect.cstruct import Instance from structlog import get_logger from ...file_utils import ( @@ -252,7 +251,7 @@ def _pad_file(self, end_offset: int) -> int: return end_offset @classmethod - def _pad_header(cls, header: Instance, c_namesize: int) -> int: + def _pad_header(cls, header, c_namesize: int) -> int: return round_up(len(header) + c_namesize, cls._PAD_ALIGN) @classmethod diff --git a/unblob/handlers/archive/dlink/encrpted_img.py b/unblob/handlers/archive/dlink/encrpted_img.py index 28ffb0108d..1ae6ae95f8 100644 --- a/unblob/handlers/archive/dlink/encrpted_img.py +++ b/unblob/handlers/archive/dlink/encrpted_img.py @@ -3,7 +3,6 @@ from typing import Optional from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import File, InvalidInputFormat @@ -53,7 +52,7 @@ class EncrptedHandler(StructHandler): HEADER_STRUCT = "dlink_header_t" EXTRACTOR = EncrptedExtractor() - def is_valid_header(self, header: Instance) -> bool: + def is_valid_header(self, header) -> bool: if header.size < len(header): return False return True diff --git a/unblob/handlers/archive/dlink/shrs.py b/unblob/handlers/archive/dlink/shrs.py index 63bd9d9313..7601871bb9 100644 --- a/unblob/handlers/archive/dlink/shrs.py +++ b/unblob/handlers/archive/dlink/shrs.py @@ -4,7 +4,6 @@ from typing import Optional from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import File, InvalidInputFormat @@ -70,7 +69,7 @@ class SHRSHandler(StructHandler): HEADER_STRUCT = "dlink_header_t" EXTRACTOR = SHRSExtractor() - def is_valid_header(self, header: Instance, file: File) -> bool: + def is_valid_header(self, header, file: File) -> bool: if header.file_size < len(header): return False # we're exactly past the header, we compute the digest diff --git a/unblob/handlers/archive/engeniustech/engenius.py b/unblob/handlers/archive/engeniustech/engenius.py index d403605bdc..a12f11fe1a 100644 --- a/unblob/handlers/archive/engeniustech/engenius.py +++ b/unblob/handlers/archive/engeniustech/engenius.py @@ -1,7 +1,6 @@ from pathlib import Path from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import Endian, File, InvalidInputFormat, StructParser @@ -81,7 +80,7 @@ class EngeniusHandler(StructHandler): EXTRACTOR = EngeniusExtractor() PATTERN_MATCH_OFFSET = -0x5C - def is_valid_header(self, header: Instance) -> bool: + def is_valid_header(self, header) -> bool: if header.length <= len(header): return False try: diff --git a/unblob/handlers/archive/hp/bdl.py b/unblob/handlers/archive/hp/bdl.py index abac452ce1..48a0129da4 100644 --- a/unblob/handlers/archive/hp/bdl.py +++ b/unblob/handlers/archive/hp/bdl.py @@ -2,7 +2,6 @@ from pathlib import Path from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractor import carve_chunk_to_file @@ -35,7 +34,7 @@ """ -def is_valid_header(header: Instance) -> bool: +def is_valid_header(header) -> bool: if header.toc_offset == 0 or header.toc_entries == 0: return False try: diff --git a/unblob/handlers/archive/hp/ipkg.py b/unblob/handlers/archive/hp/ipkg.py index c7503bec76..55c65c6f18 100644 --- a/unblob/handlers/archive/hp/ipkg.py +++ b/unblob/handlers/archive/hp/ipkg.py @@ -2,7 +2,6 @@ from pathlib import Path from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import ( @@ -48,7 +47,7 @@ """ -def is_valid_header(header: Instance) -> bool: +def is_valid_header(header) -> bool: if header.toc_offset == 0 or header.toc_entries == 0: return False try: diff --git a/unblob/handlers/archive/instar/bneg.py b/unblob/handlers/archive/instar/bneg.py index 101cb25c9b..ec526df2ea 100644 --- a/unblob/handlers/archive/instar/bneg.py +++ b/unblob/handlers/archive/instar/bneg.py @@ -1,7 +1,6 @@ from pathlib import Path from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractor import carve_chunk_to_file @@ -70,7 +69,7 @@ class BNEGHandler(StructHandler): HEADER_STRUCT = "bneg_header_t" EXTRACTOR = BNEGExtractor() - def is_valid_header(self, header: Instance) -> bool: + def is_valid_header(self, header) -> bool: if header.partition_1_size == 0: return False if header.partition_2_size == 0: diff --git a/unblob/handlers/archive/netgear/chk.py b/unblob/handlers/archive/netgear/chk.py index 441906359c..f6c023b68c 100644 --- a/unblob/handlers/archive/netgear/chk.py +++ b/unblob/handlers/archive/netgear/chk.py @@ -2,7 +2,6 @@ from pathlib import Path from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractor import carve_chunk_to_file @@ -59,7 +58,7 @@ class NetgearCHKHandler(StructHandler): HEADER_STRUCT = "chk_header_t" EXTRACTOR = CHKExtractor() - def is_valid_header(self, header: Instance) -> bool: + def is_valid_header(self, header) -> bool: if header.header_len != len(header): return False try: diff --git a/unblob/handlers/archive/netgear/trx.py b/unblob/handlers/archive/netgear/trx.py index 9fdb180807..dfbff917eb 100644 --- a/unblob/handlers/archive/netgear/trx.py +++ b/unblob/handlers/archive/netgear/trx.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Iterable, Optional, cast -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractor import carve_chunk_to_file @@ -75,12 +74,12 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] start_offset=start_offset, end_offset=start_offset + header.len ) - def is_valid_header(self, header: Instance) -> bool: + def is_valid_header(self, header) -> bool: if header.len < len(header): return False return True - def _is_crc_valid(self, file: File, start_offset: int, header: Instance) -> bool: + def _is_crc_valid(self, file: File, start_offset: int, header) -> bool: file.seek(start_offset + CRC_CONTENT_OFFSET) content = bytearray(file.read(header.len - CRC_CONTENT_OFFSET)) computed_crc = (binascii.crc32(content) ^ -1) & 0xFFFFFFFF diff --git a/unblob/handlers/archive/qnap/qnap_nas.py b/unblob/handlers/archive/qnap/qnap_nas.py index 6728c55f1b..fdadfef0b5 100644 --- a/unblob/handlers/archive/qnap/qnap_nas.py +++ b/unblob/handlers/archive/qnap/qnap_nas.py @@ -3,7 +3,6 @@ from typing import Optional import attr -from dissect.cstruct import Instance from pyperscan import Flag, Pattern, Scan, StreamDatabase from structlog import get_logger @@ -44,7 +43,7 @@ class QTSSearchContext: end_offset: int -def is_valid_header(header: Instance) -> bool: +def is_valid_header(header) -> bool: try: header.device_id.decode("utf-8") header.file_version.decode("utf-8") diff --git a/unblob/handlers/archive/xiaomi/hdr.py b/unblob/handlers/archive/xiaomi/hdr.py index 47beae8ace..30d3a659ae 100644 --- a/unblob/handlers/archive/xiaomi/hdr.py +++ b/unblob/handlers/archive/xiaomi/hdr.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Iterable, Optional, Tuple, cast -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import ( @@ -74,7 +73,7 @@ def calculate_crc(file: File, start_offset: int, size: int) -> int: return (digest ^ -1) & 0xFFFFFFFF -def is_valid_blob_header(blob_header: Instance) -> bool: +def is_valid_blob_header(blob_header) -> bool: if blob_header.magic == BLOB_MAGIC: return False if not blob_header.size: @@ -86,7 +85,7 @@ def is_valid_blob_header(blob_header: Instance) -> bool: return True -def is_valid_header(header: Instance) -> bool: +def is_valid_header(header) -> bool: if header.signature_offset < len(header): return False if not header.blob_offsets[0]: @@ -147,7 +146,7 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] return ValidChunk(start_offset=start_offset, end_offset=end_offset) def _is_crc_valid( - self, file: File, header: Instance, start_offset: int, end_offset: int + self, file: File, header, start_offset: int, end_offset: int ) -> bool: computed_crc = calculate_crc( file, diff --git a/unblob/handlers/archive/zip.py b/unblob/handlers/archive/zip.py index 53db5307fc..0a63ec09fb 100644 --- a/unblob/handlers/archive/zip.py +++ b/unblob/handlers/archive/zip.py @@ -2,7 +2,6 @@ import struct from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from ...extractors import Command @@ -90,7 +89,7 @@ def has_encrypted_files( self, file: File, start_offset: int, - end_of_central_directory: Instance, + end_of_central_directory, ) -> bool: file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET) for _ in range(end_of_central_directory.total_entries): @@ -104,7 +103,7 @@ def has_encrypted_files( return False @staticmethod - def is_zip64_eocd(end_of_central_directory: Instance): + def is_zip64_eocd(end_of_central_directory): # see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J return ( end_of_central_directory.disk_number == 0xFFFF @@ -116,14 +115,14 @@ def is_zip64_eocd(end_of_central_directory: Instance): ) @staticmethod - def is_zip64_cd_file(file_header: Instance): + def is_zip64_cd_file(file_header): # see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2 return ( file_header.file_size == 0xFFFFFFFF or file_header.compress_size == 0xFFFFFFFF ) - def _parse_zip64(self, file: File, start_offset: int, offset: int) -> Instance: + def _parse_zip64(self, file: File, start_offset: int, offset: int): file.seek(start_offset, io.SEEK_SET) for eocd_locator_offset in iterate_patterns( file, struct.pack(" bool: + def is_valid_header(self, header) -> bool: # check that header fields have valid values try: lief.ELF.E_TYPE(header.e_type) @@ -197,9 +196,7 @@ def get_last_program_end( return last_program_end - def get_end_offset( - self, file: File, start_offset: int, header: Instance, endian - ) -> int: + def get_end_offset(self, file: File, start_offset: int, header, endian) -> int: # Usually the section header is the last, but in some cases the program headers are # put to the end of the file, and in some cases sections header and actual sections # can be also intermixed, so we need also to check the end of the last section and diff --git a/unblob/handlers/filesystem/cramfs.py b/unblob/handlers/filesystem/cramfs.py index 66c78c660d..c04bc9e020 100644 --- a/unblob/handlers/filesystem/cramfs.py +++ b/unblob/handlers/filesystem/cramfs.py @@ -2,8 +2,6 @@ import struct from typing import Optional -from dissect.cstruct import Instance - from unblob.extractors import Command from ...file_utils import Endian, convert_int32, get_endian @@ -59,7 +57,7 @@ def _is_crc_valid( self, file: File, start_offset: int, - header: Instance, + header, endian: Endian, ) -> bool: # old cramfs format do not support crc diff --git a/unblob/handlers/filesystem/jffs2.py b/unblob/handlers/filesystem/jffs2.py index 0b5e3fcf31..77bf8f7a89 100644 --- a/unblob/handlers/filesystem/jffs2.py +++ b/unblob/handlers/filesystem/jffs2.py @@ -2,7 +2,6 @@ import io from typing import Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.file_utils import ( @@ -61,7 +60,7 @@ def guess_endian(self, file: File) -> Endian: file.seek(-2, io.SEEK_CUR) return endian - def valid_header(self, header: Instance, node_start_offset: int, eof: int) -> bool: + def valid_header(self, header, node_start_offset: int, eof: int) -> bool: header_crc = (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF check_crc = True diff --git a/unblob/handlers/filesystem/squashfs.py b/unblob/handlers/filesystem/squashfs.py index 099f1758bc..d115c35fac 100644 --- a/unblob/handlers/filesystem/squashfs.py +++ b/unblob/handlers/filesystem/squashfs.py @@ -1,7 +1,6 @@ from pathlib import Path from typing import List, Optional -from dissect.cstruct import Instance from structlog import get_logger from unblob.extractors import Command @@ -297,7 +296,7 @@ class SquashFSv4LEHandler(_SquashFSBase): class SquashFSv4BEExtractor(Extractor): EXECUTABLE = "sasquatch-v4be" - def is_avm(self, header: Instance) -> bool: + def is_avm(self, header) -> bool: # see https://raw.githubusercontent.com/Freetz/freetz/master/tools/make/squashfs4-host-be/AVM-BE-format.txt return header.bytes_used == header.mkfs_time diff --git a/unblob/handlers/filesystem/yaffs.py b/unblob/handlers/filesystem/yaffs.py index 966c8eaad5..a8ffcc4e28 100644 --- a/unblob/handlers/filesystem/yaffs.py +++ b/unblob/handlers/filesystem/yaffs.py @@ -6,7 +6,6 @@ from typing import Iterable, List, Optional, Tuple import attr -from dissect.cstruct import Instance from structlog import get_logger from treelib import Tree from treelib.exceptions import NodeIDAbsentError @@ -259,7 +258,7 @@ def valid_name(name: bytes) -> bool: return True -def is_valid_header(header: Instance) -> bool: +def is_valid_header(header) -> bool: if not valid_name(header.name[:-3]): return False if header.type > 5: @@ -284,7 +283,7 @@ def __init__(self, file: File, config: Optional[YAFFSConfig] = None): else: self.config = config - def build_entry(self, header: Instance, chunk: YAFFSChunk) -> YAFFSEntry: + def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: raise NotImplementedError def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: @@ -398,7 +397,7 @@ def auto_detect(self) -> YAFFSConfig: object_id_offset = 4 object_id_start = page_size + ecc_offset + object_id_offset object_id_end = object_id_start + 4 - spare_signature = self.file[object_id_start:object_id_end] + b"\xFF\xFF" + spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" config.spare_size = ( self.file[object_id_end : object_id_end + page_size].find(spare_signature) @@ -459,7 +458,6 @@ def get_entry(self, object_id: int) -> Optional[YAFFSEntry]: "Can't find entry within the YAFFS tree, something's wrong.", object_id=object_id, ) - pass return None def resolve_path(self, entry: YAFFSEntry) -> Path: @@ -534,7 +532,7 @@ def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: object_id=yaffs2_packed_tags.object_id, ) - def build_entry(self, header: Instance, chunk: YAFFSChunk) -> YAFFSEntry: + def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: return YAFFS2Entry( object_id=chunk.object_id, object_type=header.type, @@ -549,7 +547,7 @@ def build_entry(self, header: Instance, chunk: YAFFSChunk) -> YAFFSEntry: st_mtime=header.st_mtime, st_ctime=header.st_ctime, equiv_id=header.equiv_id, - alias=snull(header.alias.replace(b"\xFF", b"")).decode("utf-8"), + alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), st_rdev=header.st_rdev, win_ctime=header.win_ctime, win_mtime=header.win_mtime, @@ -644,14 +642,14 @@ def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: block_status=yaffs_sparse.block_status, ) - def build_entry(self, header: Instance, chunk: YAFFSChunk) -> YAFFSEntry: + def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: return YAFFSEntry( object_type=header.type, object_id=chunk.object_id, parent_obj_id=header.parent_obj_id, sum_no_longer_used=header.sum_no_longer_used, name=snull(header.name[0:128]).decode("utf-8"), - alias=snull(header.alias.replace(b"\xFF", b"")).decode("utf-8"), + alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), file_size=header.file_size, equiv_id=header.equivalent_object_id, )