Skip to content

Commit

Permalink
Merge pull request #58 from CybercentreCanada/artifact
Browse files Browse the repository at this point in the history
Changing artefact -> artifact
  • Loading branch information
cccs-kevin authored Jul 26, 2021
2 parents 4104b96 + 7787fa1 commit 4f27b22
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 71 deletions.
76 changes: 38 additions & 38 deletions assemblyline_v4_service/common/dynamic_service_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ def _convert_events_to_dict(events: List[Event]) -> dict:
return events_dict


class Artefact:
class Artifact:
def __init__(self, name: str = None, path: str = None, description: str = None, to_be_extracted: bool = None):
if any(item is None for item in [name, path, description, to_be_extracted]):
raise Exception("Missing positional arguments for Artefact validation")
raise Exception("Missing positional arguments for Artifact validation")

self.name = name
self.path = path
Expand Down Expand Up @@ -221,47 +221,47 @@ def _convert_processes_dict_to_tree(processes_dict: dict = None) -> List[dict]:
return SandboxOntology._sort_things_by_timestamp(root["children"])

@staticmethod
def _validate_artefacts(artefact_list: List[dict] = None) -> List[Artefact]:
if artefact_list is None:
artefact_list = []

validated_artefacts = []
for artefact in artefact_list:
validated_artefact = Artefact(
name=artefact["name"],
path=artefact["path"],
description=artefact["description"],
to_be_extracted=artefact["to_be_extracted"]
def _validate_artifacts(artifact_list: List[dict] = None) -> List[Artifact]:
if artifact_list is None:
artifact_list = []

validated_artifacts = []
for artifact in artifact_list:
validated_artifact = Artifact(
name=artifact["name"],
path=artifact["path"],
description=artifact["description"],
to_be_extracted=artifact["to_be_extracted"]
)
validated_artefacts.append(validated_artefact)
return validated_artefacts
validated_artifacts.append(validated_artifact)
return validated_artifacts

@staticmethod
def _handle_artefact(artefact: Artefact = None, artefacts_result_section: ResultSection = None):
if artefact is None:
raise Exception("Artefact cannot be None")
def _handle_artifact(artifact: Artifact = None, artifacts_result_section: ResultSection = None):
if artifact is None:
raise Exception("Artifact cannot be None")

# This is a dict who's key-value pairs follow the format {regex: result_section_title}
artefact_map = {
artifact_map = {
HOLLOWSHUNTER_EXE_REGEX: "HollowsHunter Injected Portable Executable",
HOLLOWSHUNTER_SHC_REGEX: "HollowsHunter Shellcode",
HOLLOWSHUNTER_DLL_REGEX: "HollowsHunter DLL",
}
artefact_result_section = None
artifact_result_section = None

for regex, title in artefact_map.items():
for regex, title in artifact_map.items():
pattern = compile(regex)
if pattern.match(artefact.name):
artefact_result_section = ResultSection(title)
artefact_result_section.add_tag("dynamic.process.file_name", artefact.path)
if pattern.match(artifact.name):
artifact_result_section = ResultSection(title)
artifact_result_section.add_tag("dynamic.process.file_name", artifact.path)
if regex in [HOLLOWSHUNTER_EXE_REGEX]:
# As of right now, heuristic ID 17 is associated with the Injection category in the Cuckoo service
heur = Heuristic(17)
heur.add_signature_id("hollowshunter_pe")
artefact_result_section.heuristic = heur
artifact_result_section.heuristic = heur

if artefact_result_section is not None:
artefacts_result_section.add_subsection(artefact_result_section)
if artifact_result_section is not None:
artifacts_result_section.add_subsection(artifact_result_section)

def _match_signatures_to_process_events(self, signature_dicts: List[dict]) -> dict:
process_event_dicts_with_signatures = {}
Expand Down Expand Up @@ -312,28 +312,28 @@ def run_signatures(self) -> ResultSection:
raise NotImplementedError

@staticmethod
def handle_artefacts(artefact_list: list, request: ServiceRequest) -> ResultSection:
def handle_artifacts(artifact_list: list, request: ServiceRequest) -> ResultSection:
"""
Goes through each artefact in artefact_list, uploading them and adding result sections accordingly
Goes through each artifact in artifact_list, uploading them and adding result sections accordingly
Positional arguments:
artefact_list -- list of dictionaries that each represent an artefact
artifact_list -- list of dictionaries that each represent an artifact
"""

validated_artefacts = SandboxOntology._validate_artefacts(artefact_list)
validated_artifacts = SandboxOntology._validate_artifacts(artifact_list)

artefacts_result_section = ResultSection("Sandbox Artefacts")
artifacts_result_section = ResultSection("Sandbox Artifacts")

for artefact in validated_artefacts:
SandboxOntology._handle_artefact(artefact, artefacts_result_section)
for artifact in validated_artifacts:
SandboxOntology._handle_artifact(artifact, artifacts_result_section)

if artefact.to_be_extracted:
if artifact.to_be_extracted:
try:
request.add_extracted(artefact.path, artefact.name, artefact.description)
request.add_extracted(artifact.path, artifact.name, artifact.description)
except MaxExtractedExceeded:
# To avoid errors from being raised when too many files have been extracted
pass
else:
request.add_supplementary(artefact.path, artefact.name, artefact.description)
request.add_supplementary(artifact.path, artifact.name, artifact.description)

return artefacts_result_section if artefacts_result_section.subsections else None
return artifacts_result_section if artifacts_result_section.subsections else None
66 changes: 33 additions & 33 deletions test/test_dynamic_service_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def add_extracted(self, path, name, description):
yield DummyRequest


def check_artefact_equality(this, that):
def check_artifact_equality(this, that):
if this.name == that.name and this.path == that.path and this.description == that.description \
and this.to_be_extracted == that.to_be_extracted:
return True
Expand Down Expand Up @@ -177,7 +177,7 @@ def test_init(protocol, src_ip, src_port, domain, dest_ip, dest_port, pid, times
assert n.timestamp == timestamp


class TestArtefact:
class TestArtifact:
@staticmethod
@pytest.mark.parametrize("name, path, description, to_be_extracted",
[
Expand All @@ -187,12 +187,12 @@ class TestArtefact:
]
)
def test_init(name, path, description, to_be_extracted):
from assemblyline_v4_service.common.dynamic_service_helper import Artefact
from assemblyline_v4_service.common.dynamic_service_helper import Artifact
if any(item is None for item in [name, path, description, to_be_extracted]):
with pytest.raises(Exception):
Artefact(name=name, path=path, description=description, to_be_extracted=to_be_extracted)
Artifact(name=name, path=path, description=description, to_be_extracted=to_be_extracted)
return
a = Artefact(name=name, path=path, description=description, to_be_extracted=to_be_extracted)
a = Artifact(name=name, path=path, description=description, to_be_extracted=to_be_extracted)
assert a.name == name
assert a.path == path
assert a.description == description
Expand Down Expand Up @@ -397,29 +397,29 @@ def test_convert_processes_dict_to_tree(processes_dict, expected_result):
assert expected_result == actual_result

@staticmethod
@pytest.mark.parametrize("artefact_list",
@pytest.mark.parametrize("artifact_list",
[
None,
[],
[{"name": "blah", "path": "blah", "description": "blah", "to_be_extracted": True}],
]
)
def test_validate_artefacts(artefact_list):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artefact
actual_validated_artefact_list = SandboxOntology._validate_artefacts(artefact_list)
if artefact_list is None:
artefact_list = []
for index, artefact in enumerate(artefact_list):
expected_artefact = Artefact(
name=artefact["name"],
path=artefact["path"],
description=artefact["description"],
to_be_extracted=artefact["to_be_extracted"]
def test_validate_artifacts(artifact_list):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artifact
actual_validated_artifact_list = SandboxOntology._validate_artifacts(artifact_list)
if artifact_list is None:
artifact_list = []
for index, artifact in enumerate(artifact_list):
expected_artifact = Artifact(
name=artifact["name"],
path=artifact["path"],
description=artifact["description"],
to_be_extracted=artifact["to_be_extracted"]
)
assert check_artefact_equality(expected_artefact, actual_validated_artefact_list[index])
assert check_artifact_equality(expected_artifact, actual_validated_artifact_list[index])

@staticmethod
@pytest.mark.parametrize("artefact, expected_result_section_title",
@pytest.mark.parametrize("artifact, expected_result_section_title",
[
(None, None),
({"path": "blah", "name": "blah", "description": "blah", "to_be_extracted": True}, None),
Expand All @@ -428,31 +428,31 @@ def test_validate_artefacts(artefact_list):
({"path": "blah", "name": "123_hollowshunter/hh_process_123_blah.dll", "description": "blah", "to_be_extracted": True}, "HollowsHunter DLL"),
]
)
def test_handle_artefact(artefact, expected_result_section_title):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artefact
def test_handle_artifact(artifact, expected_result_section_title):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artifact
from assemblyline_v4_service.common.result import ResultSection, Heuristic

if artefact is None:
if artifact is None:
with pytest.raises(Exception):
SandboxOntology._handle_artefact(artefact, None)
SandboxOntology._handle_artifact(artifact, None)
return

expected_result_section = None
if expected_result_section_title is not None:
expected_result_section = ResultSection(expected_result_section_title)
expected_result_section.add_tag("dynamic.process.file_name", artefact["path"])
expected_result_section.add_tag("dynamic.process.file_name", artifact["path"])
if expected_result_section_title == "HollowsHunter Injected Portable Executable":
heur = Heuristic(17)
heur.add_signature_id("hollowshunter_pe")
expected_result_section.heuristic = heur
parent_result_section = ResultSection("blah")
a = Artefact(
name=artefact["name"],
path=artefact["path"],
description=artefact["description"],
to_be_extracted=artefact["to_be_extracted"]
a = Artifact(
name=artifact["name"],
path=artifact["path"],
description=artifact["description"],
to_be_extracted=artifact["to_be_extracted"]
)
SandboxOntology._handle_artefact(a, parent_result_section)
SandboxOntology._handle_artifact(a, parent_result_section)
if len(parent_result_section.subsections) > 0:
actual_result_section = parent_result_section.subsections[0]
else:
Expand Down Expand Up @@ -536,18 +536,18 @@ def test_get_events(events, expected_result):
# assert actual_result is True

@staticmethod
@pytest.mark.parametrize("artefact_list, expected_result",
@pytest.mark.parametrize("artifact_list, expected_result",
[
(None, None),
([], None),
([{"name": "blah", "path": "blah", "description": "blah", "to_be_extracted": True}], None),
([{"name": "blah", "path": "blah", "description": "blah", "to_be_extracted": False}], None),
]
)
def test_handle_artefacts(artefact_list, expected_result, dummy_request_class):
def test_handle_artifacts(artifact_list, expected_result, dummy_request_class):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology
r = dummy_request_class()
o = SandboxOntology()
actual_result = o.handle_artefacts(artefact_list, r)
actual_result = o.handle_artifacts(artifact_list, r)
assert actual_result == expected_result

0 comments on commit 4f27b22

Please sign in to comment.