Skip to content

Commit

Permalink
chore(dissect.cstruct): Get rid of Instance type annotation
Browse files Browse the repository at this point in the history
- It was already interpreted essentially the same way as `typing.Any`,
  because of its `__getattr__` method
- Newer versions no longer have the `Instance` proxy object
- `cstruct().<x>` calls are annotated with `Any` in new versions
- The actual runtime type will be `Structure` in newer versions that
  needs to be changed in `logging` (not changed for now)

Relates-to: #888
  • Loading branch information
vlaci committed Jul 23, 2024
1 parent 842306f commit 919270a
Show file tree
Hide file tree
Showing 20 changed files with 36 additions and 63 deletions.
11 changes: 3 additions & 8 deletions tests/handlers/filesystem/test_jffs2.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import binascii

import pytest
from dissect.cstruct import Instance
from helpers import unhex

from unblob.file_utils import Endian, File
Expand Down Expand Up @@ -59,7 +58,7 @@ def get_valid_jffs2_old_be_header():
)


def calculate_crc(header: Instance):
def calculate_crc(header):
return (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF


Expand Down Expand Up @@ -188,9 +187,7 @@ def calculate_crc(header: Instance):
),
],
)
def test_valid_header_new(
header: Instance, node_start_offset: int, eof: int, expected: bool
):
def test_valid_header_new(header, node_start_offset: int, eof: int, expected: bool):
header.hdr_crc = calculate_crc(header)
assert new_handler.valid_header(header, node_start_offset, eof) == expected

Expand Down Expand Up @@ -270,9 +267,7 @@ def test_valid_header_new(
),
],
)
def test_valid_header_old(
header: Instance, node_start_offset: int, eof: int, expected: bool
):
def test_valid_header_old(header, node_start_offset: int, eof: int, expected: bool):
header.hdr_crc = calculate_crc(header)
assert old_handler.valid_header(header, node_start_offset, eof) == expected

Expand Down
4 changes: 2 additions & 2 deletions unblob/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pathlib import Path
from typing import Iterable, Iterator, List, Literal, Optional, Tuple, Union

from dissect.cstruct import Instance, cstruct
from dissect.cstruct import cstruct
from structlog import get_logger

from .logging import format_hex
Expand Down Expand Up @@ -336,7 +336,7 @@ def parse(
struct_name: str,
file: Union[File, bytes],
endian: Endian,
) -> Instance:
):
cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be
struct_parser = getattr(cparser, struct_name)
return struct_parser(file)
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/arc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractors.command import Command
Expand Down Expand Up @@ -54,7 +53,7 @@ def valid_name(self, name: bytes) -> bool:
except UnicodeDecodeError:
return False

def valid_header(self, header: Instance) -> bool:
def valid_header(self, header) -> bool:
if header.archive_marker != 0x1A:
return False
if header.header_type > 0x07:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/cpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional, Type

import attr
from dissect.cstruct import Instance
from structlog import get_logger

from ...file_utils import (
Expand Down Expand Up @@ -252,7 +251,7 @@ def _pad_file(self, end_offset: int) -> int:
return end_offset

@classmethod
def _pad_header(cls, header: Instance, c_namesize: int) -> int:
def _pad_header(cls, header, c_namesize: int) -> int:
return round_up(len(header) + c_namesize, cls._PAD_ALIGN)

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/dlink/encrpted_img.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import File, InvalidInputFormat
Expand Down Expand Up @@ -53,7 +52,7 @@ class EncrptedHandler(StructHandler):
HEADER_STRUCT = "dlink_header_t"
EXTRACTOR = EncrptedExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.size < len(header):
return False
return True
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/dlink/shrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Optional

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import File, InvalidInputFormat
Expand Down Expand Up @@ -70,7 +69,7 @@ class SHRSHandler(StructHandler):
HEADER_STRUCT = "dlink_header_t"
EXTRACTOR = SHRSExtractor()

def is_valid_header(self, header: Instance, file: File) -> bool:
def is_valid_header(self, header, file: File) -> bool:
if header.file_size < len(header):
return False
# we're exactly past the header, we compute the digest
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/engeniustech/engenius.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import Endian, File, InvalidInputFormat, StructParser
Expand Down Expand Up @@ -81,7 +80,7 @@ class EngeniusHandler(StructHandler):
EXTRACTOR = EngeniusExtractor()
PATTERN_MATCH_OFFSET = -0x5C

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.length <= len(header):
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/hp/bdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -35,7 +34,7 @@
"""


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.toc_offset == 0 or header.toc_entries == 0:
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/hp/ipkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -48,7 +47,7 @@
"""


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.toc_offset == 0 or header.toc_entries == 0:
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/instar/bneg.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -70,7 +69,7 @@ class BNEGHandler(StructHandler):
HEADER_STRUCT = "bneg_header_t"
EXTRACTOR = BNEGExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.partition_1_size == 0:
return False
if header.partition_2_size == 0:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/netgear/chk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -59,7 +58,7 @@ class NetgearCHKHandler(StructHandler):
HEADER_STRUCT = "chk_header_t"
EXTRACTOR = CHKExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.header_len != len(header):
return False
try:
Expand Down
5 changes: 2 additions & 3 deletions unblob/handlers/archive/netgear/trx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path
from typing import Iterable, Optional, cast

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -75,12 +74,12 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]
start_offset=start_offset, end_offset=start_offset + header.len
)

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.len < len(header):
return False
return True

def _is_crc_valid(self, file: File, start_offset: int, header: Instance) -> bool:
def _is_crc_valid(self, file: File, start_offset: int, header) -> bool:
file.seek(start_offset + CRC_CONTENT_OFFSET)
content = bytearray(file.read(header.len - CRC_CONTENT_OFFSET))
computed_crc = (binascii.crc32(content) ^ -1) & 0xFFFFFFFF
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/qnap/qnap_nas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

import attr
from dissect.cstruct import Instance
from pyperscan import Flag, Pattern, Scan, StreamDatabase
from structlog import get_logger

Expand Down Expand Up @@ -44,7 +43,7 @@ class QTSSearchContext:
end_offset: int


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
try:
header.device_id.decode("utf-8")
header.file_version.decode("utf-8")
Expand Down
7 changes: 3 additions & 4 deletions unblob/handlers/archive/xiaomi/hdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path
from typing import Iterable, Optional, Tuple, cast

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -74,7 +73,7 @@ def calculate_crc(file: File, start_offset: int, size: int) -> int:
return (digest ^ -1) & 0xFFFFFFFF


def is_valid_blob_header(blob_header: Instance) -> bool:
def is_valid_blob_header(blob_header) -> bool:
if blob_header.magic == BLOB_MAGIC:
return False
if not blob_header.size:
Expand All @@ -86,7 +85,7 @@ def is_valid_blob_header(blob_header: Instance) -> bool:
return True


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.signature_offset < len(header):
return False
if not header.blob_offsets[0]:
Expand Down Expand Up @@ -147,7 +146,7 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]
return ValidChunk(start_offset=start_offset, end_offset=end_offset)

def _is_crc_valid(
self, file: File, header: Instance, start_offset: int, end_offset: int
self, file: File, header, start_offset: int, end_offset: int
) -> bool:
computed_crc = calculate_crc(
file,
Expand Down
9 changes: 4 additions & 5 deletions unblob/handlers/archive/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import struct
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from ...extractors import Command
Expand Down Expand Up @@ -90,7 +89,7 @@ def has_encrypted_files(
self,
file: File,
start_offset: int,
end_of_central_directory: Instance,
end_of_central_directory,
) -> bool:
file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET)
for _ in range(end_of_central_directory.total_entries):
Expand All @@ -104,7 +103,7 @@ def has_encrypted_files(
return False

@staticmethod
def is_zip64_eocd(end_of_central_directory: Instance):
def is_zip64_eocd(end_of_central_directory):
# see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J
return (
end_of_central_directory.disk_number == 0xFFFF
Expand All @@ -116,14 +115,14 @@ def is_zip64_eocd(end_of_central_directory: Instance):
)

@staticmethod
def is_zip64_cd_file(file_header: Instance):
def is_zip64_cd_file(file_header):
# see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2
return (
file_header.file_size == 0xFFFFFFFF
or file_header.compress_size == 0xFFFFFFFF
)

def _parse_zip64(self, file: File, start_offset: int, offset: int) -> Instance:
def _parse_zip64(self, file: File, start_offset: int, offset: int):
file.seek(start_offset, io.SEEK_SET)
for eocd_locator_offset in iterate_patterns(
file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER)
Expand Down
7 changes: 2 additions & 5 deletions unblob/handlers/executable/elf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import attr
import lief
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -138,7 +137,7 @@ class _ELFBase(StructHandler):
SECTION_HEADER_STRUCT = "elf_shdr_t"
PROGRAM_HEADER_STRUCT = "elf_phdr_t"

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
# check that header fields have valid values
try:
lief.ELF.E_TYPE(header.e_type)
Expand Down Expand Up @@ -197,9 +196,7 @@ def get_last_program_end(

return last_program_end

def get_end_offset(
self, file: File, start_offset: int, header: Instance, endian
) -> int:
def get_end_offset(self, file: File, start_offset: int, header, endian) -> int:
# Usually the section header is the last, but in some cases the program headers are
# put to the end of the file, and in some cases sections header and actual sections
# can be also intermixed, so we need also to check the end of the last section and
Expand Down
4 changes: 1 addition & 3 deletions unblob/handlers/filesystem/cramfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import struct
from typing import Optional

from dissect.cstruct import Instance

from unblob.extractors import Command

from ...file_utils import Endian, convert_int32, get_endian
Expand Down Expand Up @@ -59,7 +57,7 @@ def _is_crc_valid(
self,
file: File,
start_offset: int,
header: Instance,
header,
endian: Endian,
) -> bool:
# old cramfs format do not support crc
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/filesystem/jffs2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import io
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -61,7 +60,7 @@ def guess_endian(self, file: File) -> Endian:
file.seek(-2, io.SEEK_CUR)
return endian

def valid_header(self, header: Instance, node_start_offset: int, eof: int) -> bool:
def valid_header(self, header, node_start_offset: int, eof: int) -> bool:
header_crc = (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF
check_crc = True

Expand Down
Loading

0 comments on commit 919270a

Please sign in to comment.