Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a capabilities module #1820

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
- implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer
- add call scope #771 @yelhamer
- add process scope for the dynamic analysis flavor #1517 @yelhamer
- Add thread scope for the dynamic analysis flavor #1517 @yelhamer
- add thread scope for the dynamic analysis flavor #1517 @yelhamer
- ghidra: add Ghidra feature extractor and supporting code #1770 @colton-gabertan
- ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff
- binja: add support for forwarded exports #1646 @xusheng6
Expand Down
Empty file added capa/capabilities/__init__.py
Empty file.
79 changes: 79 additions & 0 deletions capa/capabilities/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import itertools
import collections
from typing import Any, Tuple

from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor

logger = logging.getLogger(__name__)


def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
file_features: FeatureSet = collections.defaultdict(set)

for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
# not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty.
if va:
file_features[feature].add(va)
else:
if feature not in file_features:
file_features[feature] = set()

logger.debug("analyzed file and extracted %d features", len(file_features))

file_features.update(function_features)

_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return matches, len(file_features)


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values()))

for file_limitation_rule in file_limitation_rules:
if file_limitation_rule.name not in capabilities:
continue

logger.warning("-" * 80)
for line in file_limitation_rule.meta.get("description", "").split("\n"):
logger.warning(" %s", line)
logger.warning(" Identified via rule: %s", file_limitation_rule.name)
if is_standalone:
logger.warning(" ")
logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.")
logger.warning("-" * 80)

# bail on first file limitation
return True

return False


def find_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs
) -> Tuple[MatchResults, Any]:
from capa.capabilities.static import find_static_capabilities
from capa.capabilities.dynamic import find_dynamic_capabilities

if isinstance(extractor, StaticFeatureExtractor):
# for the time being, extractors are either static or dynamic.
# Remove this assertion once that has changed
assert not isinstance(extractor, DynamicFeatureExtractor)
return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs)
if isinstance(extractor, DynamicFeatureExtractor):
return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs)

raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}")
198 changes: 198 additions & 0 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import itertools
import collections
from typing import Any, Tuple

import tqdm

import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import redirecting_print_to_tqdm
from capa.capabilities.common import find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor

logger = logging.getLogger(__name__)


def find_call_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Tuple[FeatureSet, MatchResults]:
"""
find matches for the given rules for the given call.

returns: tuple containing (features for call, match results for call)
"""
# all features found for the call.
features: FeatureSet = collections.defaultdict(set)

for feature, addr in itertools.chain(
extractor.extract_call_features(ph, th, ch), extractor.extract_global_features()
):
features[feature].add(addr)

# matches found at this thread.
_, matches = ruleset.match(Scope.CALL, features, ch.address)

for rule_name, res in matches.items():
rule = ruleset[rule_name]
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])

return features, matches


def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
"""
find matches for the given rules within the given thread.

returns: tuple containing (features for thread, match results for thread, match results for calls)
"""
# all features found within this thread,
# includes features found within calls.
features: FeatureSet = collections.defaultdict(set)

# matches found at the call scope.
# might be found at different calls, thats ok.
call_matches: MatchResults = collections.defaultdict(list)

for ch in extractor.get_calls(ph, th):
ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in ifeatures.items():
features[feature].update(vas)

for rule_name, res in imatches.items():
call_matches[rule_name].extend(res)

for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)

# matches found within this thread.
_, matches = ruleset.match(Scope.THREAD, features, th.address)

for rule_name, res in matches.items():
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])

return features, matches, call_matches


def find_process_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
"""
find matches for the given rules within the given process.

returns: tuple containing (match results for process, match results for threads, match results for calls, number of features)
"""
# all features found within this process,
# includes features found within threads (and calls).
process_features: FeatureSet = collections.defaultdict(set)

# matches found at the basic threads.
# might be found at different threads, thats ok.
thread_matches: MatchResults = collections.defaultdict(list)

# matches found at the call scope.
# might be found at different calls, thats ok.
call_matches: MatchResults = collections.defaultdict(list)

for th in extractor.get_threads(ph):
features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in features.items():
process_features[feature].update(vas)

for rule_name, res in tmatches.items():
thread_matches[rule_name].extend(res)

for rule_name, res in cmatches.items():
call_matches[rule_name].extend(res)

for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()):
process_features[feature].add(va)

_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return process_matches, thread_matches, call_matches, len(process_features)


def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
) -> Tuple[MatchResults, Any]:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
all_call_matches: MatchResults = collections.defaultdict(list)

feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=())

assert isinstance(extractor, DynamicFeatureExtractor)
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
def pbar(s, *args, **kwargs):
return s

processes = list(extractor.get_processes())

pb = pbar(processes, desc="matching", unit=" processes", leave=False)
for p in pb:
process_matches, thread_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)

for rule_name, res in process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
all_call_matches[rule_name].extend(res)

# collection of features that captures the rule matches within process and thread scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
process_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_process_matches.items(), all_thread_matches.items(), all_call_matches.items()
):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
capa.engine.index_rule_matches(process_and_lower_features, rule, locations)

all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = feature_count

matches = dict(
itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_thread_matches.items(),
all_process_matches.items(),
all_call_matches.items(),
all_file_matches.items(),
)
)

meta = {
"feature_counts": feature_counts,
}

return matches, meta
Loading
Loading