Skip to content

Commit

Permalink
Merge branch 'master' into webui
Browse files Browse the repository at this point in the history
  • Loading branch information
fariss authored Aug 5, 2024
2 parents a15eb83 + 04d127f commit ac08133
Show file tree
Hide file tree
Showing 28 changed files with 913 additions and 126 deletions.
8 changes: 4 additions & 4 deletions .github/ruff.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# Enable the pycodestyle (`E`) and Pyflakes (`F`) rules by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E", "F"]
lint.select = ["E", "F"]

# Allow autofix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
lint.fixable = ["ALL"]
lint.unfixable = []

# E402 module level import not at top of file
# E722 do not use bare 'except'
# E501 line too long
ignore = ["E402", "E722", "E501"]
lint.ignore = ["E402", "E722", "E501"]

line-length = 120

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.11"]
python-version: ["3.9", "3.11"]
steps:
- name: Checkout capa with submodules
# do only run if BN_SERIAL is available, have to do this in every step, see https://github.com/orgs/community/discussions/26726#discussioncomment-3253118
Expand Down
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@
## master (unreleased)

### New Features
webui: explore capa analysis in the web #2224 @s-ff

- webui: explore capa analysis in the web #2224 @s-ff
- support analyzing DRAKVUF traces #2143 @yelhamer


### Breaking Changes

### New Rules (0)
### New Rules (1)

- nursery/upload-file-to-onedrive [email protected] [email protected]
-

### Bug Fixes

- elf: extract import / export symbols from stripped binaries #2096 @ygasparis
- elf: fix handling of symbols in corrupt ELF files #2226 @williballenthin

### capa explorer IDA Pro plugin

### Development
- CI: use macos-12 since macos-11 is deprecated and will be removed on June 28th, 2024 #2173 @mr-tz
- CI: update Binary Ninja version to 4.1 and use Python 3.9 to test it #2211 @xusheng6

### Raw diffs
- [capa v7.1.0...master](https://github.com/mandiant/capa/compare/v7.1.0...master)
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ function @ 0x4011C0
...
```

Additionally, capa also supports analyzing [CAPE](https://github.com/kevoreilly/CAPEv2) sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to CAPE for analysis, and then run capa against the generated report (JSON).
Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file.

Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it.

Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary:

Expand Down
2 changes: 2 additions & 0 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_DRAKVUF = "drakvuf"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
Expand All @@ -474,6 +475,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_DRAKVUF,
FORMAT_FREEZE,
FORMAT_RESULT,
}
Expand Down
56 changes: 56 additions & 0 deletions capa/features/extractors/drakvuf/call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Tuple, Iterator

from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
from capa.features.extractors.drakvuf.models import Call

logger = logging.getLogger(__name__)


def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
"""
This method extracts the given call's features (such as API name and arguments),
and returns them as API, Number, and String features.
args:
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
ch: call handle (for defining the extraction scope)
yields:
Feature, address; where Feature is either: API, Number, or String.
"""
call: Call = ch.inner

# list similar to disassembly: arguments right-to-left, call
for arg_value in reversed(call.arguments.values()):
try:
yield Number(int(arg_value, 0)), ch.address
except ValueError:
# DRAKVUF automatically resolves the contents of memory addresses, (e.g. Arg1="0xc6f217efe0:\"ntdll.dll\"").
# For those cases we yield the entire string as it, since yielding the address only would
# likely not provide any matches, and yielding just the memory contentswould probably be misleading,
# but yielding the entire string would be helpful for an analyst looking at the verbose output
yield String(arg_value), ch.address

yield API(call.name), ch.address


def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(ph, th, ch):
yield feature, addr


CALL_HANDLERS = (extract_call_features,)
96 changes: 96 additions & 0 deletions capa/features/extractors/drakvuf/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Dict, List, Tuple, Union, Iterator

import capa.features.extractors.drakvuf.call
import capa.features.extractors.drakvuf.file
import capa.features.extractors.drakvuf.thread
import capa.features.extractors.drakvuf.global_
import capa.features.extractors.drakvuf.process
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)
from capa.features.extractors.drakvuf.models import Call, DrakvufReport
from capa.features.extractors.drakvuf.helpers import index_calls

logger = logging.getLogger(__name__)


class DrakvufExtractor(DynamicFeatureExtractor):
def __init__(self, report: DrakvufReport):
super().__init__(
# DRAKVUF currently does not yield hash information about the sample in its output
hashes=SampleHashes(md5="", sha1="", sha256="")
)

self.report: DrakvufReport = report

# sort the api calls to prevent going through the entire list each time
self.sorted_calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = index_calls(report)

# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.drakvuf.global_.extract_features(self.report))

def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# DRAKVUF currently does not yield information about the PE's address
return NO_ADDRESS

def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features

def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.file.extract_features(self.report)

def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.drakvuf.file.get_processes(self.sorted_calls)

def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.process.extract_features(ph)

def get_process_name(self, ph: ProcessHandle) -> str:
return ph.inner["process_name"]

def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.drakvuf.process.get_threads(self.sorted_calls, ph)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.drakvuf.thread.get_calls(self.sorted_calls, ph, th)

def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> str:
call: Call = ch.inner
call_name = "{}({}){}".format(
call.name,
", ".join(f"{arg_name}={arg_value}" for arg_name, arg_value in call.arguments.items()),
(f" -> {getattr(call, 'return_value', '')}"), # SysCalls don't have a return value, while WinApi calls do
)
return call_name

def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.call.extract_features(ph, th, ch)

@classmethod
def from_report(cls, report: Iterator[Dict]) -> "DrakvufExtractor":
dr = DrakvufReport.from_raw_report(report)
return DrakvufExtractor(report=dr)
56 changes: 56 additions & 0 deletions capa/features/extractors/drakvuf/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Dict, List, Tuple, Iterator

from capa.features.file import Import
from capa.features.common import Feature
from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle
from capa.features.extractors.drakvuf.models import Call, DrakvufReport

logger = logging.getLogger(__name__)


def get_processes(calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]) -> Iterator[ProcessHandle]:
"""
Get all the created processes for a sample.
"""
for proc_addr, calls_per_thread in calls.items():
sample_call = next(iter(calls_per_thread.values()))[0] # get process name
yield ProcessHandle(proc_addr, inner={"process_name": sample_call.process_name})


def extract_import_names(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
"""
Extract imported function names.
"""
if report.loaded_dlls is None:
return
dlls = report.loaded_dlls

for dll in dlls:
dll_base_name = dll.name.split("\\")[-1]
for function_name, function_address in dll.imports.items():
for name in generate_symbols(dll_base_name, function_name, include_dll=True):
yield Import(name), AbsoluteVirtualAddress(function_address)


def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(report):
yield feature, addr


FILE_HANDLERS = (
# TODO(yelhamer): extract more file features from other DRAKVUF plugins
# https://github.com/mandiant/capa/issues/2169
extract_import_names,
)
44 changes: 44 additions & 0 deletions capa/features/extractors/drakvuf/global_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Tuple, Iterator

from capa.features.common import OS, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.drakvuf.models import DrakvufReport

logger = logging.getLogger(__name__)


def extract_format(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Format(FORMAT_PE), NO_ADDRESS


def extract_os(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only PE files: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield OS(OS_WINDOWS), NO_ADDRESS


def extract_arch(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only x64 Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Arch(ARCH_AMD64), NO_ADDRESS


def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(report):
yield feature, addr


GLOBAL_HANDLER = (
extract_format,
extract_os,
extract_arch,
)
39 changes: 39 additions & 0 deletions capa/features/extractors/drakvuf/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import itertools
from typing import Dict, List

from capa.features.address import ThreadAddress, ProcessAddress
from capa.features.extractors.drakvuf.models import Call, DrakvufReport


def index_calls(report: DrakvufReport) -> Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]:
# this method organizes calls into processes and threads, and then sorts them based on
# timestamp so that we can address individual calls per index (CallAddress requires call index)
result: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = {}
for call in itertools.chain(report.syscalls, report.apicalls):
if call.pid == 0:
# DRAKVUF captures api/native calls from all processes running on the system.
# we ignore the pid 0 since it's a system process and it's unlikely for it to
# be hijacked or so on, in addition to capa addresses not supporting null pids
continue
proc_addr = ProcessAddress(pid=call.pid, ppid=call.ppid)
thread_addr = ThreadAddress(process=proc_addr, tid=call.tid)
if proc_addr not in result:
result[proc_addr] = {}
if thread_addr not in result[proc_addr]:
result[proc_addr][thread_addr] = []

result[proc_addr][thread_addr].append(call)

for proc, threads in result.items():
for thread in threads:
result[proc][thread].sort(key=lambda call: call.timestamp)

return result
Loading

0 comments on commit ac08133

Please sign in to comment.