Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Feature Extractor for the Drakvuf Sandbox #2143

Merged
merged 62 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
a408629
initial commit
yelhamer Jun 11, 2024
603d623
update changelog
yelhamer Jun 11, 2024
90ef348
Merge branch 'master' into drakvuf-extractor
yelhamer Jun 11, 2024
1e8735a
Update CHANGELOG.md
yelhamer Jun 11, 2024
d2cdccf
Update pyproject.toml
yelhamer Jun 11, 2024
840f59f
Apply suggestions from code review: Typos
yelhamer Jun 17, 2024
9e13362
capa/helpers.py: update if/else statement
yelhamer Jun 17, 2024
2e408d8
loader.py: replace print() statement with log.info()
yelhamer Jun 19, 2024
a73d16f
Merge branch 'master' into drakvuf-extractor
yelhamer Jun 19, 2024
b28e0d0
Update capa/features/extractors/drakvuf/models.py
yelhamer Jun 20, 2024
c05b973
extractors/drakvuf/call.py: yield arguments right to left
yelhamer Jun 21, 2024
70d03eb
extractors/drakvuf/file.py: add a TODO comment for extracting more fi…
yelhamer Jun 21, 2024
8d4f3c7
extractors/drakvuf/global_.py: add arch extraction
yelhamer Jun 21, 2024
bf12ce8
extractors/drakvuf/helpers.py: ignore null pids
yelhamer Jun 21, 2024
84d68a4
capa/helpers.py: mention msgspec.json explicitely
yelhamer Jun 21, 2024
00349d5
capa/helpers.py: generalize empty sandbox reports error logging
yelhamer Jun 21, 2024
53439c7
capa/loader.py: log jsonl garbage collection into debug
yelhamer Jun 21, 2024
2663fa6
features/extractors/drakvuf/models.py: add documentation for SystemCa…
yelhamer Jun 21, 2024
3bea6e7
capa/main.py: fix erroneous imports
yelhamer Jun 21, 2024
15a5efd
drakvuf extractor: fixed faulty type annotations
yelhamer Jun 21, 2024
0c0c4d0
fix black formatting
yelhamer Jun 21, 2024
04ae280
fix flake8 issues
yelhamer Jun 21, 2024
e54f38f
drakvuf file extraction: add link to tracking issue
yelhamer Jun 21, 2024
cb7babc
drakvuf reports: add the ability to read gzip-compressed report files
yelhamer Jun 21, 2024
5284ec0
capa/helpers.py: fix mypy issues
yelhamer Jun 21, 2024
21d50e0
apply review comments
yelhamer Jun 25, 2024
885f216
drakvuf/helpers.py: add more information about null pid
yelhamer Jun 27, 2024
3b2b022
drakvuf/file.py: remove discovered_dlls file strings extraction
yelhamer Jun 27, 2024
1e4ed12
capa/helpers.py: add comments for the dynamic extensions
yelhamer Jun 27, 2024
b7f4058
capa/helpers.py: log bad lines
yelhamer Jun 27, 2024
0f1750c
capa/helpers.py: add gzip support for reading one jsonl line
yelhamer Jun 27, 2024
4749f24
drakvuf/helpers.py: add comment for sort_calls()
yelhamer Jun 27, 2024
37f82cb
tests/fixtures.py: add TODO for unifying CAPE and Drakvuf tests
yelhamer Jun 27, 2024
c45aaa0
drakvuf/models.py: add TODO comment for supporting more drakvuf plugins
yelhamer Jun 27, 2024
aeea39b
tests/fixtures.py: remove obsolete file strings tests
yelhamer Jun 27, 2024
9b5dffc
Merge branch 'master' into drakvuf-extractor
yelhamer Jul 2, 2024
c862f12
Update capa/main.py
yelhamer Jul 16, 2024
cea64d3
Update capa/features/extractors/drakvuf/models.py
yelhamer Jul 16, 2024
718d6ff
Update capa/features/extractors/drakvuf/models.py
yelhamer Jul 16, 2024
32c7a53
Update capa/features/extractors/drakvuf/call.py
yelhamer Jul 16, 2024
7248c0a
Update CHANGELOG.md
yelhamer Jul 16, 2024
de43d1e
Update capa/features/extractors/drakvuf/helpers.py
yelhamer Jul 16, 2024
3cd5cde
review comments
yelhamer Jul 16, 2024
454cd2d
Update capa/features/extractors/drakvuf/extractor.py
yelhamer Jul 16, 2024
f9d5c4a
Update capa/features/extractors/drakvuf/models.py
yelhamer Jul 16, 2024
6617fc0
styling
yelhamer Jul 16, 2024
8e7bc75
drakvuf/extractor.py: black linting
yelhamer Jul 16, 2024
93240f5
drakvuf/models.py: remove need to empty report checking
yelhamer Jul 17, 2024
c08c5bf
tests: add drakvuf models test
yelhamer Jul 17, 2024
6e0a9eb
Update capa/features/extractors/drakvuf/global_.py
yelhamer Jul 19, 2024
2bb7f3c
Update tests/test_cape_features.py
yelhamer Jul 19, 2024
c0e9150
Update capa/features/extractors/drakvuf/models.py
yelhamer Jul 19, 2024
897e98b
Apply suggestions from code review: rename Drakvuf to DRAKVUF
yelhamer Jul 19, 2024
e786552
drakvuf/call.py: use int(..., 0) instead of str_to_number()
yelhamer Jul 23, 2024
4cab975
remove str_to_number
yelhamer Jul 23, 2024
2576aa1
drakvuf/call.py: yield argument memory address value as well
yelhamer Jul 23, 2024
b5047a2
Update call.py: remove verbosity in yield statement
yelhamer Jul 23, 2024
e26072e
Update call.py: yield missing address as well
yelhamer Jul 23, 2024
d9e3ca1
drakvuf/call.py: yield entire argument string only
yelhamer Jul 24, 2024
3e3be41
update readme.md
yelhamer Jul 24, 2024
729679d
Update README.md: typo
yelhamer Jul 24, 2024
3fb0eaf
Update CHANGELOG.md
williballenthin Jul 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- support analyzing DRAKVUF traces #2143 @yelhamer

### Breaking Changes

Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ function @ 0x4011C0
...
```

Additionally, capa also supports analyzing [CAPE](https://github.com/kevoreilly/CAPEv2) sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to CAPE for analysis, and then run capa against the generated report (JSON).
Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file.

Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it.
Comment on lines +129 to +132
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great!


Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary:

Expand Down
2 changes: 2 additions & 0 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_DRAKVUF = "drakvuf"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
Expand All @@ -474,6 +475,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_DRAKVUF,
FORMAT_FREEZE,
FORMAT_RESULT,
}
Expand Down
56 changes: 56 additions & 0 deletions capa/features/extractors/drakvuf/call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Tuple, Iterator

from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
from capa.features.extractors.drakvuf.models import Call

logger = logging.getLogger(__name__)


def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
"""
This method extracts the given call's features (such as API name and arguments),
and returns them as API, Number, and String features.

args:
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
ch: call handle (for defining the extraction scope)

yields:
Feature, address; where Feature is either: API, Number, or String.
"""
call: Call = ch.inner

# list similar to disassembly: arguments right-to-left, call
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
for arg_value in reversed(call.arguments.values()):
try:
yield Number(int(arg_value, 0)), ch.address
except ValueError:
# DRAKVUF automatically resolves the contents of memory addresses, (e.g. Arg1="0xc6f217efe0:\"ntdll.dll\"").
# For those cases we yield the entire string as it, since yielding the address only would
# likely not provide any matches, and yielding just the memory contentswould probably be misleading,
# but yielding the entire string would be helpful for an analyst looking at the verbose output
yield String(arg_value), ch.address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, if I understand the code correctly, and this iterates over arguments from apimon, arg_value won't be a string. Instead, parsed values look like "0xc6f217efe0:\"ntdll.dll\"" in the JSON. Is that OK?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yelhamer please comment or address

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now yielding the "ntdll.dll" part of the argument in addition to the entire string (we yield the entire string just in case of unexpected argument formats).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yelhamer can you show some examples from show-features.py? I'm not quite following what you mean by this formatting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@williballenthin I meant that for "0xc6f217efe0:\"ntdll.dll\"" for example it would yield String("ntdll.dll") and String("0xc6f217efe0:\"ntdll.dll\""), but looking at show-features.py it does give misleading results (displays same argument twice):

yacine@y:~/src/capa/scripts$ python3 show-features.py small_drakmon.log -d
DEBUG:capa:skipping library code matching: only supported by the vivisect backend
global: global: format(pe)
global: global: os(windows)
global: global: arch(amd64)
proc: \Device\HarddiskVolume2\Windows\System32\conhost.exe (ppid=4852, pid=3564)
 proc: \Device\HarddiskVolume2\Windows\System32\conhost.exe: string(\\Device\\HarddiskVolume2\\Windows\\System32\\conhost.exe)
  thread: 6592
    call 0: LdrLoadDll(440203471832, "api-ms-win-core-fibers-l1-1-1", 0x667e2beb90:"api-ms-win-core-fibers-l1-1-1", 0, 2049)

With this in mind I think I might just revert to just yielding "0xc6f217efe0:\"ntdll.dll\"" as we originally planned, since it would show up in show-features.py and might give analysts more insights, and it also wouldn't be misleading like yielding just "ntdll.dll", and finally I don't imagine we would be missing any rule matches by yielding "0xc6f217efe0:\"ntdll.dll\"" because the relevant api function would be expecting a memory address so I wouldn't imagine any rules basing any logic on that. Thoughts?


yield API(call.name), ch.address


def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(ph, th, ch):
yield feature, addr


CALL_HANDLERS = (extract_call_features,)
96 changes: 96 additions & 0 deletions capa/features/extractors/drakvuf/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Dict, List, Tuple, Union, Iterator

import capa.features.extractors.drakvuf.call
import capa.features.extractors.drakvuf.file
import capa.features.extractors.drakvuf.thread
import capa.features.extractors.drakvuf.global_
import capa.features.extractors.drakvuf.process
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)
from capa.features.extractors.drakvuf.models import Call, DrakvufReport
from capa.features.extractors.drakvuf.helpers import index_calls

logger = logging.getLogger(__name__)


class DrakvufExtractor(DynamicFeatureExtractor):
def __init__(self, report: DrakvufReport):
super().__init__(
# DRAKVUF currently does not yield hash information about the sample in its output
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
hashes=SampleHashes(md5="", sha1="", sha256="")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be blank or contain an indication that this is not available/provided by the sandbox?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm unsure. CAPE's extractor had one of them empty (since it doesn't report it) so I just did the same here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it's a shame no hash at all is available...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately DRAKVUF is primarily a full VM monitor. In DRAKVUF sandbox it's (ab)used to function as a malware sandbox, but drakmon.log is the output directly from DRAKVUF.

Which is good! It makes this integration more generic (works with DRAKVUF, not just with DRAKVUF sandbox). But that purpose mismatch causes glitches like this.

I think it's possible to send a PR to DRAKVUF that adds logging of sample hashes to the DRAKVUF's injector output. If this is valuable I can take a look at this (I can't promise it gets merged, though). But we can't have this in the GSOC timeline, so I hope PR can progress without it.

)

self.report: DrakvufReport = report

# sort the api calls to prevent going through the entire list each time
self.sorted_calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = index_calls(report)

# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.drakvuf.global_.extract_features(self.report))

def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# DRAKVUF currently does not yield information about the PE's address
return NO_ADDRESS

def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features

def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.file.extract_features(self.report)

def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.drakvuf.file.get_processes(self.sorted_calls)

def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.process.extract_features(ph)

def get_process_name(self, ph: ProcessHandle) -> str:
return ph.inner["process_name"]

def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.drakvuf.process.get_threads(self.sorted_calls, ph)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
williballenthin marked this conversation as resolved.
Show resolved Hide resolved
yield Characteristic("never"), NO_ADDRESS
return
yelhamer marked this conversation as resolved.
Show resolved Hide resolved

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.drakvuf.thread.get_calls(self.sorted_calls, ph, th)

def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> str:
call: Call = ch.inner
call_name = "{}({}){}".format(
call.name,
", ".join(f"{arg_name}={arg_value}" for arg_name, arg_value in call.arguments.items()),
(f" -> {getattr(call, 'return_value', '')}"), # SysCalls don't have a return value, while WinApi calls do
)
return call_name

def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.drakvuf.call.extract_features(ph, th, ch)

@classmethod
def from_report(cls, report: Iterator[Dict]) -> "DrakvufExtractor":
dr = DrakvufReport.from_raw_report(report)
return DrakvufExtractor(report=dr)
56 changes: 56 additions & 0 deletions capa/features/extractors/drakvuf/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Dict, List, Tuple, Iterator

from capa.features.file import Import
from capa.features.common import Feature
from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle
from capa.features.extractors.drakvuf.models import Call, DrakvufReport

logger = logging.getLogger(__name__)


def get_processes(calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]) -> Iterator[ProcessHandle]:
"""
Get all the created processes for a sample.
"""
for proc_addr, calls_per_thread in calls.items():
sample_call = next(iter(calls_per_thread.values()))[0] # get process name
mr-tz marked this conversation as resolved.
Show resolved Hide resolved
yield ProcessHandle(proc_addr, inner={"process_name": sample_call.process_name})


def extract_import_names(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
"""
Extract imported function names.
"""
if report.loaded_dlls is None:
Comment on lines +33 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loaded DLLs means something else to me than imports - do they mean the same thing here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from reading the comments on the relevant drakvuf source code is that the output of this plugin includes imported functions from DLLs loaded by the PE loader, as well as the ones that might be dynamically loaded by a process. I think this because the comments say that they are hooking some windows system calls in order to do this (I believe?), and if this is the case then I feel like this plugin is providing an extensive list of imports which includes static ones as well as dynamic ones that malware might try to load discretely which is why I added this here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add this documentation to the code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just noticed that Drakvuf reports the imported functions for each process. Should I extract the imported functions in the process scope instead? this way if a user is analyzing only a specific process then they wouldn't get false results from an import originating from another process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the file scope extractors we're only interested in the imports of the target file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact of the static analysis module and likely differs in dynamic analysis and across sandboxes - so maybe we need a new way to handle these?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thread needs to be resolved.

At the very least, I think we should only yield the imports for the input file.

Optionally, if we can come up with some good motivation and test cases, then we could also extend the sandbox extractor API to cover the recursively imported DLLs/names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that DRAKVUF outputs only execution trace (including loaded DLLs and imported functions) and doesn't concern itself with static analysis.

Can I help with resolving it somehow?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yelhamer please emit only the import names from the target DLL or none at all. I understand that there's maybe another way to interpret these imports (such as all imports seen in the address space), but this would be inconsistent with other feature extractors, and will be difficult to keep straight and reason about.

I suspect that these import features won't be commonly used, so emitting none at all is usually going to be fine. If we can come up with some specific problematic cases, then we can reassess.

return
dlls = report.loaded_dlls

for dll in dlls:
dll_base_name = dll.name.split("\\")[-1]
for function_name, function_address in dll.imports.items():
for name in generate_symbols(dll_base_name, function_name, include_dll=True):
yield Import(name), AbsoluteVirtualAddress(function_address)


def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(report):
yield feature, addr


FILE_HANDLERS = (
# TODO(yelhamer): extract more file features from other DRAKVUF plugins
# https://github.com/mandiant/capa/issues/2169
extract_import_names,
)
44 changes: 44 additions & 0 deletions capa/features/extractors/drakvuf/global_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import logging
from typing import Tuple, Iterator

from capa.features.common import OS, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.drakvuf.models import DrakvufReport

logger = logging.getLogger(__name__)


def extract_format(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Format(FORMAT_PE), NO_ADDRESS


def extract_os(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only PE files: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield OS(OS_WINDOWS), NO_ADDRESS


def extract_arch(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
# DRAKVUF sandbox currently supports only x64 Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html
yield Arch(ARCH_AMD64), NO_ADDRESS


def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(report):
yield feature, addr


GLOBAL_HANDLER = (
mr-tz marked this conversation as resolved.
Show resolved Hide resolved
extract_format,
extract_os,
extract_arch,
)
39 changes: 39 additions & 0 deletions capa/features/extractors/drakvuf/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import itertools
from typing import Dict, List

from capa.features.address import ThreadAddress, ProcessAddress
from capa.features.extractors.drakvuf.models import Call, DrakvufReport


def index_calls(report: DrakvufReport) -> Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]:
# this method organizes calls into processes and threads, and then sorts them based on
# timestamp so that we can address individual calls per index (CallAddress requires call index)
result: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = {}
for call in itertools.chain(report.syscalls, report.apicalls):
if call.pid == 0:
# DRAKVUF captures api/native calls from all processes running on the system.
# we ignore the pid 0 since it's a system process and it's unlikely for it to
# be hijacked or so on, in addition to capa addresses not supporting null pids
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
continue
proc_addr = ProcessAddress(pid=call.pid, ppid=call.ppid)
thread_addr = ThreadAddress(process=proc_addr, tid=call.tid)
if proc_addr not in result:
result[proc_addr] = {}
if thread_addr not in result[proc_addr]:
result[proc_addr][thread_addr] = []

result[proc_addr][thread_addr].append(call)

for proc, threads in result.items():
for thread in threads:
result[proc][thread].sort(key=lambda call: call.timestamp)

return result
Loading