From 0bdc03106d9d681ca5054c2a3f1e1a782a231ab3 Mon Sep 17 00:00:00 2001 From: insolor <2442833+insolor@users.noreply.github.com> Date: Tue, 5 Nov 2024 20:17:32 +0300 Subject: [PATCH] Extract strings with subroutines (#80) --- dfint64_patch/extract_strings/cli.py | 29 +----- dfint64_patch/extract_subroutines/__init__.py | 0 dfint64_patch/extract_subroutines/cli.py | 0 .../extract_subroutines/from_raw_bytes.py | 34 +++++++ dfint64_patch/strings_context/__init__.py | 0 .../strings_context/extract_strings_graphs.py | 4 + .../extract_strings_with_subs.py | 99 +++++++++++++++++++ dfint64_patch/utils.py | 22 +++++ pyproject.toml | 1 + tests/test_extract_subroutines.py | 31 ++++++ 10 files changed, 195 insertions(+), 25 deletions(-) create mode 100644 dfint64_patch/extract_subroutines/__init__.py create mode 100644 dfint64_patch/extract_subroutines/cli.py create mode 100644 dfint64_patch/extract_subroutines/from_raw_bytes.py create mode 100644 dfint64_patch/strings_context/__init__.py create mode 100644 dfint64_patch/strings_context/extract_strings_graphs.py create mode 100644 dfint64_patch/strings_context/extract_strings_with_subs.py create mode 100644 dfint64_patch/utils.py create mode 100644 tests/test_extract_subroutines.py diff --git a/dfint64_patch/extract_strings/cli.py b/dfint64_patch/extract_strings/cli.py index 3cf2bcd..e4c5577 100644 --- a/dfint64_patch/extract_strings/cli.py +++ b/dfint64_patch/extract_strings/cli.py @@ -1,10 +1,7 @@ import operator -import sys -from collections.abc import Generator, Iterator -from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path -from typing import BinaryIO, TextIO, cast +from typing import BinaryIO, cast from loguru import logger from omegaconf import DictConfig @@ -20,9 +17,10 @@ extract_strings_from_raw_bytes, ) from dfint64_patch.type_aliases import Rva +from dfint64_patch.utils import maybe_open -def extract_strings(pe_file: BinaryIO) -> Iterator[ExtractedStringInfo]: +def extract_strings(pe_file: BinaryIO) -> list[ExtractedStringInfo]: pe = PortableExecutable(pe_file) sections = pe.section_table @@ -45,26 +43,7 @@ def extract_strings(pe_file: BinaryIO) -> Iterator[ExtractedStringInfo]: ) filtered = filter(lambda x: x[0] in cross_references, strings) - result = sorted(filtered, key=lambda s: min(cross_references[s.address])) - - yield from result - - -@contextmanager -def maybe_open(file_name: str | None) -> Generator[TextIO, None, None]: - """ - Open a file if the name is provided, and close it on exit from with-block, - or provide stdout as a file object, if the file_name parameter is None - :param file_name: file name or None - :return: file object - """ - file_object = sys.stdout if file_name is None or file_name == "stdout" else Path(file_name).open("w") # noqa: SIM115 - - try: - yield file_object - finally: - if file_object != sys.stdout: - file_object.close() + return sorted(filtered, key=lambda s: min(cross_references[s.address])) @dataclass diff --git a/dfint64_patch/extract_subroutines/__init__.py b/dfint64_patch/extract_subroutines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dfint64_patch/extract_subroutines/cli.py b/dfint64_patch/extract_subroutines/cli.py new file mode 100644 index 0000000..e69de29 diff --git a/dfint64_patch/extract_subroutines/from_raw_bytes.py b/dfint64_patch/extract_subroutines/from_raw_bytes.py new file mode 100644 index 0000000..10ce559 --- /dev/null +++ b/dfint64_patch/extract_subroutines/from_raw_bytes.py @@ -0,0 +1,34 @@ +import re +from bisect import bisect_right +from collections.abc import Iterator, Sequence +from typing import NamedTuple + +SUBROUTINE_ALIGNMENT = 4 + + +class SubroutineInfo(NamedTuple): + start: int + end: int # exclusive + + +def extract_subroutines(buffer: bytes, base_offset: int = 0) -> Iterator[SubroutineInfo]: + start = 0 + + for match in re.finditer(rb"\xCC+", buffer): + if match.end() % SUBROUTINE_ALIGNMENT != 0: + continue + + end = match.start() + yield SubroutineInfo(base_offset + start, base_offset + end) + start = match.end() + + if start < len(buffer): + yield SubroutineInfo(base_offset + start, base_offset + len(buffer)) + + +def which_subroutine(subroutines: Sequence[SubroutineInfo], address: int) -> SubroutineInfo | None: + addresses = [subroutine.start for subroutine in subroutines] + index = bisect_right(addresses, address) - 1 + if index < 0 or index >= len(subroutines) or address >= subroutines[index].end: + return None + return subroutines[index] diff --git a/dfint64_patch/strings_context/__init__.py b/dfint64_patch/strings_context/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dfint64_patch/strings_context/extract_strings_graphs.py b/dfint64_patch/strings_context/extract_strings_graphs.py new file mode 100644 index 0000000..625633f --- /dev/null +++ b/dfint64_patch/strings_context/extract_strings_graphs.py @@ -0,0 +1,4 @@ +""" +TODO: Extract strings with their subroutines, +TODO: trace the code and create diagrams of the code branches with the referenced strings +""" diff --git a/dfint64_patch/strings_context/extract_strings_with_subs.py b/dfint64_patch/strings_context/extract_strings_with_subs.py new file mode 100644 index 0000000..785753c --- /dev/null +++ b/dfint64_patch/strings_context/extract_strings_with_subs.py @@ -0,0 +1,99 @@ +""" +Extract strings grouped by subroutines +""" +from collections import defaultdict +from dataclasses import dataclass +from operator import itemgetter +from pathlib import Path +from typing import BinaryIO, NamedTuple + +from omegaconf import DictConfig +from peclasses.portable_executable import PortableExecutable + +from dfint64_patch.binio import read_section_data +from dfint64_patch.config import with_config +from dfint64_patch.cross_references.cross_references_relative import find_relative_cross_references +from dfint64_patch.extract_strings.from_raw_bytes import ExtractedStringInfo, extract_strings_from_raw_bytes +from dfint64_patch.extract_subroutines.from_raw_bytes import SubroutineInfo, extract_subroutines, which_subroutine +from dfint64_patch.type_aliases import Rva + + +def extract_strings_with_xrefs(pe_file: BinaryIO, pe: PortableExecutable) -> dict[ExtractedStringInfo, list[Rva]]: + sections = pe.section_table + code_section = sections[0] + string_section = sections[1] + + strings = list( + extract_strings_from_raw_bytes( + read_section_data(pe_file, string_section), + base_address=string_section.virtual_address, + ), + ) + + cross_references = find_relative_cross_references( + read_section_data(pe_file, code_section), + base_address=code_section.virtual_address, + addresses=map(itemgetter(0), strings), + ) + + return { + string_info: cross_references[string_info.address] + for string_info in strings + if cross_references[string_info.address] + } + + +class StringCrossReference(NamedTuple): + string: str + cross_reference: Rva + + +def extract_strings_grouped_by_subs(pe_file: BinaryIO) -> dict[Rva, list[StringCrossReference]]: + pe = PortableExecutable(pe_file) + sections = pe.section_table + code_section = sections[0] + + image_base = pe.optional_header.image_base + + strings_with_xrefs = extract_strings_with_xrefs(pe_file, pe) + + subroutines = list(extract_subroutines( + read_section_data(pe_file, code_section), + base_offset=code_section.virtual_address, + )) + + raw_result: dict[SubroutineInfo, list[StringCrossReference]] = defaultdict(list) + for string_info, xrefs in strings_with_xrefs.items(): + for xref in xrefs: + subroutine = which_subroutine(subroutines, xref) + if not subroutine: + continue + raw_result[subroutine].append(StringCrossReference(string_info.string, xref)) + + result: dict[Rva, list[StringCrossReference]] = {} + for subroutine, string_xrefs in sorted(raw_result.items(), key=itemgetter(0)): + sorted_xrefs = sorted(string_xrefs, key=lambda x: x.cross_reference) + result[Rva(image_base + subroutine.start)] = sorted_xrefs + + return result + + +@dataclass +class ExtractConfig(DictConfig): + file_name: str + out_file: str | None = None + + +@with_config(ExtractConfig, ".extract.yaml") +def main(conf: ExtractConfig) -> None: + with Path(conf.file_name).open("rb") as pe_file: + for subroutine, strings in extract_strings_grouped_by_subs(pe_file).items(): + print(f"sub_{subroutine:x}:") + for string in strings: + print(f"\t{string.string}") + + print() + + +if __name__ == "__main__": + main() diff --git a/dfint64_patch/utils.py b/dfint64_patch/utils.py new file mode 100644 index 0000000..f688f88 --- /dev/null +++ b/dfint64_patch/utils.py @@ -0,0 +1,22 @@ +import sys +from collections.abc import Generator +from contextlib import contextmanager +from pathlib import Path +from typing import TextIO + + +@contextmanager +def maybe_open(file_name: str | None) -> Generator[TextIO, None, None]: + """ + Open a file if the name is provided, and close it on exit from with-block, + or provide stdout as a file object, if the file_name parameter is None + :param file_name: file name or None + :return: file object + """ + file_object = sys.stdout if file_name is None or file_name == "stdout" else Path(file_name).open("w") # noqa: SIM115 + + try: + yield file_object + finally: + if file_object != sys.stdout: + file_object.close() diff --git a/pyproject.toml b/pyproject.toml index 482a0b5..00a80a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,4 +70,5 @@ ignore = [ "RUF001", # String contains ambiguous {}. Did you mean {}? "S311", # Standard pseudo-random generators are not suitable for cryptographic purposes "PLR2004", # Magic value used in comparison + "FBT001", # Ignore boolean positional argument ] diff --git a/tests/test_extract_subroutines.py b/tests/test_extract_subroutines.py new file mode 100644 index 0000000..e9add83 --- /dev/null +++ b/tests/test_extract_subroutines.py @@ -0,0 +1,31 @@ +import pytest + +from dfint64_patch.extract_subroutines.from_raw_bytes import SubroutineInfo, extract_subroutines, which_subroutine + + +@pytest.mark.parametrize( + ("test_data", "offset", "expected"), + [ + (b"\x90" * 8, 0, [SubroutineInfo(0, 8)]), + (b"\x90\xCC", 0, [SubroutineInfo(0, 2)]), + (b"\x90\xCC\xCC\xCC\x90\x90", 0, [SubroutineInfo(0, 1), SubroutineInfo(4, 6)]), + (b"\x90\xCC\xCC\xCC\x90\x90", 1, [SubroutineInfo(1, 2), SubroutineInfo(5, 7)]), + ], +) +def test_extract_subroutines_from_bytes(test_data: bytes, offset: int, expected: list[SubroutineInfo]): + assert list(extract_subroutines(test_data, offset)) == expected + + +@pytest.mark.parametrize( + ("subroutines", "address", "expected"), + [ + ([], 0, None), + ([SubroutineInfo(1, 2)], 2, None), + ([SubroutineInfo(1, 2)], 0, None), + ([SubroutineInfo(1, 2), SubroutineInfo(3, 4)], 2, None), + ([SubroutineInfo(1, 2)], 1, SubroutineInfo(1, 2)), + ([SubroutineInfo(1, 2), SubroutineInfo(3, 4), SubroutineInfo(4, 5)], 3, SubroutineInfo(3, 4)), + ], +) +def test_which_subroutine(subroutines: list[SubroutineInfo], address: int, expected: bool): + assert which_subroutine(subroutines, address) == expected