Skip to content

Commit

Permalink
capabilities: use dataclasses to represent complicated return types
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed Dec 12, 2024
1 parent 6d05d3c commit 37f6ccb
Show file tree
Hide file tree
Showing 20 changed files with 276 additions and 217 deletions.
30 changes: 23 additions & 7 deletions capa/capabilities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,28 @@
import logging
import itertools
import collections
from typing import Any
from typing import Optional
from dataclasses import dataclass

from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor

logger = logging.getLogger(__name__)


def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
@dataclass
class FileCapabilities:
features: FeatureSet
matches: MatchResults
feature_count: int


def find_file_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet
) -> FileCapabilities:
file_features: FeatureSet = collections.defaultdict(set)

for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
Expand All @@ -36,8 +47,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi

file_features.update(function_features)

_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return matches, len(file_features)
features, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return FileCapabilities(features, matches, len(file_features))


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
Expand All @@ -62,9 +73,14 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon
return False


def find_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs
) -> tuple[MatchResults, Any]:
@dataclass
class Capabilities:
matches: MatchResults
feature_counts: StaticFeatureCounts | DynamicFeatureCounts
library_functions: Optional[tuple[LibraryFunction, ...]] = None


def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs) -> Capabilities:
from capa.capabilities.static import find_static_capabilities
from capa.capabilities.dynamic import find_dynamic_capabilities

Expand Down
101 changes: 57 additions & 44 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import logging
import itertools
import collections
from typing import Any
from dataclasses import dataclass

import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.capabilities.common import find_file_capabilities
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor

logger = logging.getLogger(__name__)
Expand All @@ -26,13 +26,17 @@
SEQUENCE_SIZE = 5


@dataclass
class CallCapabilities:
features: FeatureSet
matches: MatchResults


def find_call_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> tuple[FeatureSet, MatchResults]:
) -> CallCapabilities:
"""
find matches for the given rules for the given call.
returns: tuple containing (features for call, match results for call)
"""
# all features found for the call.
features: FeatureSet = collections.defaultdict(set)
Expand All @@ -50,16 +54,22 @@ def find_call_capabilities(
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])

return features, matches
return CallCapabilities(features, matches)


@dataclass
class ThreadCapabilities:
features: FeatureSet
thread_matches: MatchResults
sequence_matches: MatchResults
call_matches: MatchResults


def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> tuple[FeatureSet, MatchResults, MatchResults, MatchResults]:
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread.
returns: tuple containing (features for thread, match results for thread, match results for sequences, match results for calls)
"""
# all features found within this thread,
# includes features found within calls.
Expand All @@ -75,20 +85,20 @@ def find_thread_capabilities(
sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)

for ch in extractor.get_calls(ph, th):
cfeatures, cmatches = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in cfeatures.items():
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)

for rule_name, res in cmatches.items():
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)

sequence.append(cfeatures)
sfeatures: FeatureSet = collections.defaultdict(set)
sequence.append(call_capabilities.features)
sequence_features: FeatureSet = collections.defaultdict(set)
for call in sequence:
for feature, vas in call.items():
sfeatures[feature].update(vas)
sequence_features[feature].update(vas)

_, smatches = ruleset.match(Scope.SEQUENCE, sfeatures, ch.address)
_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address)
for rule_name, res in smatches.items():
sequence_matches[rule_name].extend(res)

Expand All @@ -103,16 +113,23 @@ def find_thread_capabilities(
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])

return features, matches, sequence_matches, call_matches
return ThreadCapabilities(features, matches, sequence_matches, call_matches)


@dataclass
class ProcessCapabilities:
process_matches: MatchResults
thread_matches: MatchResults
sequence_matches: MatchResults
call_matches: MatchResults
feature_count: int


def find_process_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle
) -> tuple[MatchResults, MatchResults, MatchResults, MatchResults, int]:
) -> ProcessCapabilities:
"""
find matches for the given rules within the given process.
returns: tuple containing (match results for process, match results for threads, match results for calls, number of features)
"""
# all features found within this process,
# includes features found within threads (and calls).
Expand All @@ -131,29 +148,29 @@ def find_process_capabilities(
call_matches: MatchResults = collections.defaultdict(list)

for th in extractor.get_threads(ph):
features, tmatches, smatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in features.items():
thread_capabilities = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in thread_capabilities.features.items():
process_features[feature].update(vas)

for rule_name, res in tmatches.items():
for rule_name, res in thread_capabilities.thread_matches.items():
thread_matches[rule_name].extend(res)

for rule_name, res in smatches.items():
for rule_name, res in thread_capabilities.sequence_matches.items():
sequence_matches[rule_name].extend(res)

for rule_name, res in cmatches.items():
for rule_name, res in thread_capabilities.call_matches.items():
call_matches[rule_name].extend(res)

for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()):
process_features[feature].add(va)

_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return process_matches, thread_matches, sequence_matches, call_matches, len(process_features)
return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features))


def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
) -> tuple[MatchResults, Any]:
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
all_sequence_matches: MatchResults = collections.defaultdict(list)
Expand All @@ -170,21 +187,21 @@ def find_dynamic_capabilities(
) as pbar:
task = pbar.add_task("matching", total=n_processes, unit="processes")
for p in processes:
process_matches, thread_matches, sequence_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
process_capabilities = find_process_capabilities(ruleset, extractor, p)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
rdoc.ProcessFeatureCount(
address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count
),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)
logger.debug("analyzed %s and extracted %d features", p.address, process_capabilities.feature_count)

for rule_name, res in process_matches.items():
for rule_name, res in process_capabilities.process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
for rule_name, res in process_capabilities.thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in sequence_matches.items():
for rule_name, res in process_capabilities.sequence_matches.items():
all_sequence_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
for rule_name, res in process_capabilities.call_matches.items():
all_call_matches[rule_name].extend(res)

pbar.advance(task)
Expand All @@ -199,8 +216,8 @@ def find_dynamic_capabilities(
rule = ruleset[rule_name]
capa.engine.index_rule_matches(process_and_lower_features, rule, locations)

all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = feature_count
all_file_capabilities = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = all_file_capabilities.feature_count

matches = dict(
itertools.chain(
Expand All @@ -211,12 +228,8 @@ def find_dynamic_capabilities(
all_sequence_matches.items(),
all_thread_matches.items(),
all_process_matches.items(),
all_file_matches.items(),
all_file_capabilities.matches.items(),
)
)

meta = {
"feature_counts": feature_counts,
}

return matches, meta
return Capabilities(matches, feature_counts)
Loading

0 comments on commit 37f6ccb

Please sign in to comment.