From 7787fa18143490d6e29e9a78782f90bd6dd02202 Mon Sep 17 00:00:00 2001 From: Kevin Hardy-Cooper Date: Mon, 26 Jul 2021 09:41:44 -0400 Subject: [PATCH] Changing artefact -> artifact --- .../common/dynamic_service_helper.py | 76 +++++++++---------- test/test_dynamic_service_helper.py | 66 ++++++++-------- 2 files changed, 71 insertions(+), 71 deletions(-) diff --git a/assemblyline_v4_service/common/dynamic_service_helper.py b/assemblyline_v4_service/common/dynamic_service_helper.py index fd689b02..2c8d42a7 100644 --- a/assemblyline_v4_service/common/dynamic_service_helper.py +++ b/assemblyline_v4_service/common/dynamic_service_helper.py @@ -141,10 +141,10 @@ def _convert_events_to_dict(events: List[Event]) -> dict: return events_dict -class Artefact: +class Artifact: def __init__(self, name: str = None, path: str = None, description: str = None, to_be_extracted: bool = None): if any(item is None for item in [name, path, description, to_be_extracted]): - raise Exception("Missing positional arguments for Artefact validation") + raise Exception("Missing positional arguments for Artifact validation") self.name = name self.path = path @@ -221,47 +221,47 @@ def _convert_processes_dict_to_tree(processes_dict: dict = None) -> List[dict]: return SandboxOntology._sort_things_by_timestamp(root["children"]) @staticmethod - def _validate_artefacts(artefact_list: List[dict] = None) -> List[Artefact]: - if artefact_list is None: - artefact_list = [] - - validated_artefacts = [] - for artefact in artefact_list: - validated_artefact = Artefact( - name=artefact["name"], - path=artefact["path"], - description=artefact["description"], - to_be_extracted=artefact["to_be_extracted"] + def _validate_artifacts(artifact_list: List[dict] = None) -> List[Artifact]: + if artifact_list is None: + artifact_list = [] + + validated_artifacts = [] + for artifact in artifact_list: + validated_artifact = Artifact( + name=artifact["name"], + path=artifact["path"], + description=artifact["description"], + to_be_extracted=artifact["to_be_extracted"] ) - validated_artefacts.append(validated_artefact) - return validated_artefacts + validated_artifacts.append(validated_artifact) + return validated_artifacts @staticmethod - def _handle_artefact(artefact: Artefact = None, artefacts_result_section: ResultSection = None): - if artefact is None: - raise Exception("Artefact cannot be None") + def _handle_artifact(artifact: Artifact = None, artifacts_result_section: ResultSection = None): + if artifact is None: + raise Exception("Artifact cannot be None") # This is a dict who's key-value pairs follow the format {regex: result_section_title} - artefact_map = { + artifact_map = { HOLLOWSHUNTER_EXE_REGEX: "HollowsHunter Injected Portable Executable", HOLLOWSHUNTER_SHC_REGEX: "HollowsHunter Shellcode", HOLLOWSHUNTER_DLL_REGEX: "HollowsHunter DLL", } - artefact_result_section = None + artifact_result_section = None - for regex, title in artefact_map.items(): + for regex, title in artifact_map.items(): pattern = compile(regex) - if pattern.match(artefact.name): - artefact_result_section = ResultSection(title) - artefact_result_section.add_tag("dynamic.process.file_name", artefact.path) + if pattern.match(artifact.name): + artifact_result_section = ResultSection(title) + artifact_result_section.add_tag("dynamic.process.file_name", artifact.path) if regex in [HOLLOWSHUNTER_EXE_REGEX]: # As of right now, heuristic ID 17 is associated with the Injection category in the Cuckoo service heur = Heuristic(17) heur.add_signature_id("hollowshunter_pe") - artefact_result_section.heuristic = heur + artifact_result_section.heuristic = heur - if artefact_result_section is not None: - artefacts_result_section.add_subsection(artefact_result_section) + if artifact_result_section is not None: + artifacts_result_section.add_subsection(artifact_result_section) def _match_signatures_to_process_events(self, signature_dicts: List[dict]) -> dict: process_event_dicts_with_signatures = {} @@ -312,28 +312,28 @@ def run_signatures(self) -> ResultSection: raise NotImplementedError @staticmethod - def handle_artefacts(artefact_list: list, request: ServiceRequest) -> ResultSection: + def handle_artifacts(artifact_list: list, request: ServiceRequest) -> ResultSection: """ - Goes through each artefact in artefact_list, uploading them and adding result sections accordingly + Goes through each artifact in artifact_list, uploading them and adding result sections accordingly Positional arguments: - artefact_list -- list of dictionaries that each represent an artefact + artifact_list -- list of dictionaries that each represent an artifact """ - validated_artefacts = SandboxOntology._validate_artefacts(artefact_list) + validated_artifacts = SandboxOntology._validate_artifacts(artifact_list) - artefacts_result_section = ResultSection("Sandbox Artefacts") + artifacts_result_section = ResultSection("Sandbox Artifacts") - for artefact in validated_artefacts: - SandboxOntology._handle_artefact(artefact, artefacts_result_section) + for artifact in validated_artifacts: + SandboxOntology._handle_artifact(artifact, artifacts_result_section) - if artefact.to_be_extracted: + if artifact.to_be_extracted: try: - request.add_extracted(artefact.path, artefact.name, artefact.description) + request.add_extracted(artifact.path, artifact.name, artifact.description) except MaxExtractedExceeded: # To avoid errors from being raised when too many files have been extracted pass else: - request.add_supplementary(artefact.path, artefact.name, artefact.description) + request.add_supplementary(artifact.path, artifact.name, artifact.description) - return artefacts_result_section if artefacts_result_section.subsections else None + return artifacts_result_section if artifacts_result_section.subsections else None diff --git a/test/test_dynamic_service_helper.py b/test/test_dynamic_service_helper.py index fd1adf53..8c19f157 100644 --- a/test/test_dynamic_service_helper.py +++ b/test/test_dynamic_service_helper.py @@ -38,7 +38,7 @@ def add_extracted(self, path, name, description): yield DummyRequest -def check_artefact_equality(this, that): +def check_artifact_equality(this, that): if this.name == that.name and this.path == that.path and this.description == that.description \ and this.to_be_extracted == that.to_be_extracted: return True @@ -177,7 +177,7 @@ def test_init(protocol, src_ip, src_port, domain, dest_ip, dest_port, pid, times assert n.timestamp == timestamp -class TestArtefact: +class TestArtifact: @staticmethod @pytest.mark.parametrize("name, path, description, to_be_extracted", [ @@ -187,12 +187,12 @@ class TestArtefact: ] ) def test_init(name, path, description, to_be_extracted): - from assemblyline_v4_service.common.dynamic_service_helper import Artefact + from assemblyline_v4_service.common.dynamic_service_helper import Artifact if any(item is None for item in [name, path, description, to_be_extracted]): with pytest.raises(Exception): - Artefact(name=name, path=path, description=description, to_be_extracted=to_be_extracted) + Artifact(name=name, path=path, description=description, to_be_extracted=to_be_extracted) return - a = Artefact(name=name, path=path, description=description, to_be_extracted=to_be_extracted) + a = Artifact(name=name, path=path, description=description, to_be_extracted=to_be_extracted) assert a.name == name assert a.path == path assert a.description == description @@ -397,29 +397,29 @@ def test_convert_processes_dict_to_tree(processes_dict, expected_result): assert expected_result == actual_result @staticmethod - @pytest.mark.parametrize("artefact_list", + @pytest.mark.parametrize("artifact_list", [ None, [], [{"name": "blah", "path": "blah", "description": "blah", "to_be_extracted": True}], ] ) - def test_validate_artefacts(artefact_list): - from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artefact - actual_validated_artefact_list = SandboxOntology._validate_artefacts(artefact_list) - if artefact_list is None: - artefact_list = [] - for index, artefact in enumerate(artefact_list): - expected_artefact = Artefact( - name=artefact["name"], - path=artefact["path"], - description=artefact["description"], - to_be_extracted=artefact["to_be_extracted"] + def test_validate_artifacts(artifact_list): + from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artifact + actual_validated_artifact_list = SandboxOntology._validate_artifacts(artifact_list) + if artifact_list is None: + artifact_list = [] + for index, artifact in enumerate(artifact_list): + expected_artifact = Artifact( + name=artifact["name"], + path=artifact["path"], + description=artifact["description"], + to_be_extracted=artifact["to_be_extracted"] ) - assert check_artefact_equality(expected_artefact, actual_validated_artefact_list[index]) + assert check_artifact_equality(expected_artifact, actual_validated_artifact_list[index]) @staticmethod - @pytest.mark.parametrize("artefact, expected_result_section_title", + @pytest.mark.parametrize("artifact, expected_result_section_title", [ (None, None), ({"path": "blah", "name": "blah", "description": "blah", "to_be_extracted": True}, None), @@ -428,31 +428,31 @@ def test_validate_artefacts(artefact_list): ({"path": "blah", "name": "123_hollowshunter/hh_process_123_blah.dll", "description": "blah", "to_be_extracted": True}, "HollowsHunter DLL"), ] ) - def test_handle_artefact(artefact, expected_result_section_title): - from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artefact + def test_handle_artifact(artifact, expected_result_section_title): + from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology, Artifact from assemblyline_v4_service.common.result import ResultSection, Heuristic - if artefact is None: + if artifact is None: with pytest.raises(Exception): - SandboxOntology._handle_artefact(artefact, None) + SandboxOntology._handle_artifact(artifact, None) return expected_result_section = None if expected_result_section_title is not None: expected_result_section = ResultSection(expected_result_section_title) - expected_result_section.add_tag("dynamic.process.file_name", artefact["path"]) + expected_result_section.add_tag("dynamic.process.file_name", artifact["path"]) if expected_result_section_title == "HollowsHunter Injected Portable Executable": heur = Heuristic(17) heur.add_signature_id("hollowshunter_pe") expected_result_section.heuristic = heur parent_result_section = ResultSection("blah") - a = Artefact( - name=artefact["name"], - path=artefact["path"], - description=artefact["description"], - to_be_extracted=artefact["to_be_extracted"] + a = Artifact( + name=artifact["name"], + path=artifact["path"], + description=artifact["description"], + to_be_extracted=artifact["to_be_extracted"] ) - SandboxOntology._handle_artefact(a, parent_result_section) + SandboxOntology._handle_artifact(a, parent_result_section) if len(parent_result_section.subsections) > 0: actual_result_section = parent_result_section.subsections[0] else: @@ -536,7 +536,7 @@ def test_get_events(events, expected_result): # assert actual_result is True @staticmethod - @pytest.mark.parametrize("artefact_list, expected_result", + @pytest.mark.parametrize("artifact_list, expected_result", [ (None, None), ([], None), @@ -544,10 +544,10 @@ def test_get_events(events, expected_result): ([{"name": "blah", "path": "blah", "description": "blah", "to_be_extracted": False}], None), ] ) - def test_handle_artefacts(artefact_list, expected_result, dummy_request_class): + def test_handle_artifacts(artifact_list, expected_result, dummy_request_class): from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology r = dummy_request_class() o = SandboxOntology() - actual_result = o.handle_artefacts(artefact_list, r) + actual_result = o.handle_artifacts(artifact_list, r) assert actual_result == expected_result