Skip to content

Commit

Permalink
Extract strings with subroutines (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
insolor authored Nov 5, 2024
1 parent dc7fcdd commit 0bdc031
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 25 deletions.
29 changes: 4 additions & 25 deletions dfint64_patch/extract_strings/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import operator
import sys
from collections.abc import Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO, TextIO, cast
from typing import BinaryIO, cast

from loguru import logger
from omegaconf import DictConfig
Expand All @@ -20,9 +17,10 @@
extract_strings_from_raw_bytes,
)
from dfint64_patch.type_aliases import Rva
from dfint64_patch.utils import maybe_open


def extract_strings(pe_file: BinaryIO) -> Iterator[ExtractedStringInfo]:
def extract_strings(pe_file: BinaryIO) -> list[ExtractedStringInfo]:
pe = PortableExecutable(pe_file)

sections = pe.section_table
Expand All @@ -45,26 +43,7 @@ def extract_strings(pe_file: BinaryIO) -> Iterator[ExtractedStringInfo]:
)

filtered = filter(lambda x: x[0] in cross_references, strings)
result = sorted(filtered, key=lambda s: min(cross_references[s.address]))

yield from result


@contextmanager
def maybe_open(file_name: str | None) -> Generator[TextIO, None, None]:
"""
Open a file if the name is provided, and close it on exit from with-block,
or provide stdout as a file object, if the file_name parameter is None
:param file_name: file name or None
:return: file object
"""
file_object = sys.stdout if file_name is None or file_name == "stdout" else Path(file_name).open("w") # noqa: SIM115

try:
yield file_object
finally:
if file_object != sys.stdout:
file_object.close()
return sorted(filtered, key=lambda s: min(cross_references[s.address]))


@dataclass
Expand Down
Empty file.
Empty file.
34 changes: 34 additions & 0 deletions dfint64_patch/extract_subroutines/from_raw_bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import re
from bisect import bisect_right
from collections.abc import Iterator, Sequence
from typing import NamedTuple

SUBROUTINE_ALIGNMENT = 4


class SubroutineInfo(NamedTuple):
start: int
end: int # exclusive


def extract_subroutines(buffer: bytes, base_offset: int = 0) -> Iterator[SubroutineInfo]:
start = 0

for match in re.finditer(rb"\xCC+", buffer):
if match.end() % SUBROUTINE_ALIGNMENT != 0:
continue

end = match.start()
yield SubroutineInfo(base_offset + start, base_offset + end)
start = match.end()

if start < len(buffer):
yield SubroutineInfo(base_offset + start, base_offset + len(buffer))


def which_subroutine(subroutines: Sequence[SubroutineInfo], address: int) -> SubroutineInfo | None:
addresses = [subroutine.start for subroutine in subroutines]
index = bisect_right(addresses, address) - 1
if index < 0 or index >= len(subroutines) or address >= subroutines[index].end:
return None
return subroutines[index]
Empty file.
4 changes: 4 additions & 0 deletions dfint64_patch/strings_context/extract_strings_graphs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
TODO: Extract strings with their subroutines,
TODO: trace the code and create diagrams of the code branches with the referenced strings
"""
99 changes: 99 additions & 0 deletions dfint64_patch/strings_context/extract_strings_with_subs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Extract strings grouped by subroutines
"""
from collections import defaultdict
from dataclasses import dataclass
from operator import itemgetter
from pathlib import Path
from typing import BinaryIO, NamedTuple

from omegaconf import DictConfig
from peclasses.portable_executable import PortableExecutable

from dfint64_patch.binio import read_section_data
from dfint64_patch.config import with_config
from dfint64_patch.cross_references.cross_references_relative import find_relative_cross_references
from dfint64_patch.extract_strings.from_raw_bytes import ExtractedStringInfo, extract_strings_from_raw_bytes
from dfint64_patch.extract_subroutines.from_raw_bytes import SubroutineInfo, extract_subroutines, which_subroutine
from dfint64_patch.type_aliases import Rva


def extract_strings_with_xrefs(pe_file: BinaryIO, pe: PortableExecutable) -> dict[ExtractedStringInfo, list[Rva]]:
sections = pe.section_table
code_section = sections[0]
string_section = sections[1]

strings = list(
extract_strings_from_raw_bytes(
read_section_data(pe_file, string_section),
base_address=string_section.virtual_address,
),
)

cross_references = find_relative_cross_references(
read_section_data(pe_file, code_section),
base_address=code_section.virtual_address,
addresses=map(itemgetter(0), strings),
)

return {
string_info: cross_references[string_info.address]
for string_info in strings
if cross_references[string_info.address]
}


class StringCrossReference(NamedTuple):
string: str
cross_reference: Rva


def extract_strings_grouped_by_subs(pe_file: BinaryIO) -> dict[Rva, list[StringCrossReference]]:
pe = PortableExecutable(pe_file)
sections = pe.section_table
code_section = sections[0]

image_base = pe.optional_header.image_base

strings_with_xrefs = extract_strings_with_xrefs(pe_file, pe)

subroutines = list(extract_subroutines(
read_section_data(pe_file, code_section),
base_offset=code_section.virtual_address,
))

raw_result: dict[SubroutineInfo, list[StringCrossReference]] = defaultdict(list)
for string_info, xrefs in strings_with_xrefs.items():
for xref in xrefs:
subroutine = which_subroutine(subroutines, xref)
if not subroutine:
continue
raw_result[subroutine].append(StringCrossReference(string_info.string, xref))

result: dict[Rva, list[StringCrossReference]] = {}
for subroutine, string_xrefs in sorted(raw_result.items(), key=itemgetter(0)):
sorted_xrefs = sorted(string_xrefs, key=lambda x: x.cross_reference)
result[Rva(image_base + subroutine.start)] = sorted_xrefs

return result


@dataclass
class ExtractConfig(DictConfig):
file_name: str
out_file: str | None = None


@with_config(ExtractConfig, ".extract.yaml")
def main(conf: ExtractConfig) -> None:
with Path(conf.file_name).open("rb") as pe_file:
for subroutine, strings in extract_strings_grouped_by_subs(pe_file).items():
print(f"sub_{subroutine:x}:")
for string in strings:
print(f"\t{string.string}")

print()


if __name__ == "__main__":
main()
22 changes: 22 additions & 0 deletions dfint64_patch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sys
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from typing import TextIO


@contextmanager
def maybe_open(file_name: str | None) -> Generator[TextIO, None, None]:
"""
Open a file if the name is provided, and close it on exit from with-block,
or provide stdout as a file object, if the file_name parameter is None
:param file_name: file name or None
:return: file object
"""
file_object = sys.stdout if file_name is None or file_name == "stdout" else Path(file_name).open("w") # noqa: SIM115

try:
yield file_object
finally:
if file_object != sys.stdout:
file_object.close()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ ignore = [
"RUF001", # String contains ambiguous {}. Did you mean {}?
"S311", # Standard pseudo-random generators are not suitable for cryptographic purposes
"PLR2004", # Magic value used in comparison
"FBT001", # Ignore boolean positional argument
]
31 changes: 31 additions & 0 deletions tests/test_extract_subroutines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest

from dfint64_patch.extract_subroutines.from_raw_bytes import SubroutineInfo, extract_subroutines, which_subroutine


@pytest.mark.parametrize(
("test_data", "offset", "expected"),
[
(b"\x90" * 8, 0, [SubroutineInfo(0, 8)]),
(b"\x90\xCC", 0, [SubroutineInfo(0, 2)]),
(b"\x90\xCC\xCC\xCC\x90\x90", 0, [SubroutineInfo(0, 1), SubroutineInfo(4, 6)]),
(b"\x90\xCC\xCC\xCC\x90\x90", 1, [SubroutineInfo(1, 2), SubroutineInfo(5, 7)]),
],
)
def test_extract_subroutines_from_bytes(test_data: bytes, offset: int, expected: list[SubroutineInfo]):
assert list(extract_subroutines(test_data, offset)) == expected


@pytest.mark.parametrize(
("subroutines", "address", "expected"),
[
([], 0, None),
([SubroutineInfo(1, 2)], 2, None),
([SubroutineInfo(1, 2)], 0, None),
([SubroutineInfo(1, 2), SubroutineInfo(3, 4)], 2, None),
([SubroutineInfo(1, 2)], 1, SubroutineInfo(1, 2)),
([SubroutineInfo(1, 2), SubroutineInfo(3, 4), SubroutineInfo(4, 5)], 3, SubroutineInfo(3, 4)),
],
)
def test_which_subroutine(subroutines: list[SubroutineInfo], address: int, expected: bool):
assert which_subroutine(subroutines, address) == expected

0 comments on commit 0bdc031

Please sign in to comment.