From 4cc9f25c6cf2b00e9d2e982874ef37754366c164 Mon Sep 17 00:00:00 2001 From: Mike Urbanski Date: Thu, 9 May 2024 17:04:00 -0500 Subject: [PATCH] platform(general): Handle SAST suppressions (suppressions V2) (#6109) * add logic to handle the new suppression format * log suppressions v2 * handle policy, repo, and file suppressions v2 * add tests and handle cloned policies * fix typing * add helper methods * fix line indentation * remove unnecessary null check * pass prisma severity to sast core * use get to handle old suppressions * remove extra iteration of values --- .../features/policy_metadata_integration.py | 3 + .../features/suppressions_integration.py | 103 +++++-- checkov/runner_filter.py | 5 +- checkov/sast/engines/prisma_engine.py | 5 + checkov/sast/prisma_models/library_input.py | 3 +- .../test_policy_metadata_integration.py | 19 +- .../test_suppressions_integration.py | 271 +++++++++++++++++- 7 files changed, 381 insertions(+), 28 deletions(-) diff --git a/checkov/common/bridgecrew/integration_features/features/policy_metadata_integration.py b/checkov/common/bridgecrew/integration_features/features/policy_metadata_integration.py index d3853f9e90b..dc3bbbc594c 100644 --- a/checkov/common/bridgecrew/integration_features/features/policy_metadata_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/policy_metadata_integration.py @@ -22,6 +22,7 @@ class PolicyMetadataIntegration(BaseIntegrationFeature): def __init__(self, bc_integration: BcPlatformIntegration) -> None: super().__init__(bc_integration=bc_integration, order=0) self.check_metadata: dict[str, Any] = {} + self.sast_check_metadata: dict[str, Any] = {} self.bc_to_ckv_id_mapping: dict[str, str] = {} self.pc_to_ckv_id_mapping: dict[str, str] = {} self.ckv_id_to_source_incident_id_mapping: dict[str, str] = {} @@ -145,6 +146,8 @@ def _handle_public_metadata(self, check_metadata: dict[str, Any]) -> None: def _handle_customer_run_config(self, run_config: dict[str, Any]) -> None: self.check_metadata = run_config['policyMetadata'] for ckv_id, pol in self.check_metadata.items(): + if 'SAST' in ckv_id: + self.sast_check_metadata[ckv_id] = pol self.bc_to_ckv_id_mapping[pol['id']] = ckv_id if self.bc_integration.is_prisma_integration() and pol.get('pcPolicyId'): self.pc_to_ckv_id_mapping[pol['pcPolicyId']] = ckv_id diff --git a/checkov/common/bridgecrew/integration_features/features/suppressions_integration.py b/checkov/common/bridgecrew/integration_features/features/suppressions_integration.py index 4758d3783fc..5f88dd3d91f 100644 --- a/checkov/common/bridgecrew/integration_features/features/suppressions_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/suppressions_integration.py @@ -1,9 +1,10 @@ from __future__ import annotations +import itertools import logging import re from itertools import groupby -from typing import TYPE_CHECKING, Pattern, Any +from typing import TYPE_CHECKING, Pattern, Any, Optional from checkov.common.bridgecrew.check_type import CheckType @@ -26,6 +27,7 @@ class SuppressionsIntegration(BaseIntegrationFeature): def __init__(self, bc_integration: BcPlatformIntegration) -> None: super().__init__(bc_integration=bc_integration, order=2) # must be after the custom policies integration + self.suppressions_v2: dict[str, list[dict[str, Any]]] = {} self.suppressions: dict[str, list[dict[str, Any]]] = {} # bcorgname_provider_timestamp (ex: companyxyz_aws_1234567891011) @@ -52,26 +54,56 @@ def pre_scan(self) -> None: return suppressions = self.bc_integration.customer_run_config_response.get('suppressions') + suppressions_v2 = self.bc_integration.customer_run_config_response.get('suppressionsV2') # currently just SAST for suppression in suppressions: + suppression['isV1'] = True if suppression['policyId'] in metadata_integration.bc_to_ckv_id_mapping: suppression['checkovPolicyId'] = metadata_integration.get_ckv_id_from_bc_id(suppression['policyId']) else: suppression['checkovPolicyId'] = suppression['policyId'] # custom policy + for suppression in suppressions_v2: + suppression['isV1'] = False + checkov_ids = [] + for policy_id in suppression['policyIds']: + if policy_id in metadata_integration.bc_to_ckv_id_mapping: + checkov_ids.append(metadata_integration.bc_to_ckv_id_mapping[policy_id]) + else: + checkov_ids.append(policy_id) # custom policy - not supported yet + suppression['checkovPolicyIds'] = checkov_ids + self._init_repo_regex() suppressions = sorted(suppressions, key=lambda s: s['checkovPolicyId']) # group and map by policy ID self.suppressions = {policy_id: list(sup) for policy_id, sup in groupby(suppressions, key=lambda s: s['checkovPolicyId'])} - logging.debug(f'Found {len(self.suppressions)} valid suppressions from the platform.') - logging.debug('The found suppression rules are:') + + # map suppressions v2 by checkov ID - because the policy IDs are arrays, we need to map each unique ID in each + # suppression's policy ID array to its suppressions + self.suppressions_v2 = SuppressionsIntegration.create_suppression_v2_policy_id_map(suppressions_v2) + + logging.debug('The found suppression v1 rules are:') logging.debug(self.suppressions) + logging.debug('The found suppression v2 rules are:') + logging.debug(self.suppressions_v2) + except Exception: self.integration_feature_failures = True logging.debug("Scanning without applying suppressions configured in the platform.", exc_info=True) + @staticmethod + def create_suppression_v2_policy_id_map(suppressions_v2: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: + checkov_id_map: dict[str, list[dict[str, Any]]] = {} + for suppression in suppressions_v2: + for checkov_id in suppression['checkovPolicyIds']: + if checkov_id in checkov_id_map: + checkov_id_map[checkov_id].append(suppression) + else: + checkov_id_map[checkov_id] = [suppression] + return checkov_id_map + def post_runner(self, scan_report: Report) -> None: self._apply_suppressions_to_report(scan_report) @@ -89,10 +121,13 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None: check.check_id = 'BC_VUL_1' relevant_suppressions = self.suppressions.get(check.check_id) + relevant_suppressions_v2 = self.suppressions_v2.get(check.check_id) + + has_suppression = relevant_suppressions or relevant_suppressions_v2 - applied_suppression = self._check_suppressions(check, relevant_suppressions) if relevant_suppressions else None + applied_suppression = self._check_suppressions(check, relevant_suppressions, relevant_suppressions_v2) if has_suppression else None if applied_suppression: - suppress_comment = applied_suppression['comment'] + suppress_comment = applied_suppression['comment'] if applied_suppression['isV1'] else applied_suppression['justificationComment'] logging.debug(f'Applying suppression to the check {check.check_id} with the comment: {suppress_comment}') check.check_result = { 'result': CheckResult.SKIPPED, @@ -107,17 +142,19 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None: scan_report.failed_checks = still_failed_checks scan_report.passed_checks = still_passed_checks - def _check_suppressions(self, record: Record, suppressions: list[dict[str, Any]]) -> dict[str, Any] | None: + def _check_suppressions(self, record: Record, suppressions: Optional[list[dict[str, Any]]], suppressions_v2: Optional[list[dict[str, Any]]]) -> dict[str, Any] | None: """ - Checks the specified suppressions against the specified record, returning the first applicable suppression, - or None of no suppression is applicable. - :param record: - :param suppressions: + Checks the specified suppressions against the specified record, returning the applied suppression, if any, else None :return: """ - for suppression in suppressions: - if self._check_suppression(record, suppression): - return suppression + if suppressions: + for suppression in suppressions: + if self._check_suppression(record, suppression): + return suppression + if suppressions_v2: + for suppression in suppressions_v2: + if self._check_suppression_v2(record, suppression): + return suppression return None def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> bool: @@ -189,6 +226,38 @@ def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> boo return False + @staticmethod + def normalize_file_path(file_path: str) -> str: + """ + Returns the file path with a leading slash, if not already present + """ + return file_path if file_path.startswith('/') else f'/{file_path}' + + def _check_suppression_v2_file(self, record_file_path: str, suppression_file_path: str, suppression_repo_name: str) -> bool: + return self.bc_integration.repo_matches(suppression_repo_name)\ + and (suppression_file_path == record_file_path or suppression_file_path == convert_to_unix_path(record_file_path)) + + def _check_suppression_v2(self, record: Record, suppression: dict[str, Any]) -> bool: + if record.check_id not in suppression['checkovPolicyIds']: + return False + + type = suppression['ruleType'] + + if type == 'policy': + # We just checked the policy ID above + return True + elif type == 'finding': + pass # TODO how to map them? + elif type == 'file': + record_file_path = SuppressionsIntegration.normalize_file_path(record.repo_file_path) + for file_suppression in suppression['files']: + suppression_file_path = SuppressionsIntegration.normalize_file_path(file_suppression['filePath']) + if self._check_suppression_v2_file(record_file_path, suppression_file_path, file_suppression.get('repositoryName', '')): + return True + elif type == 'repository': + return any(self.bc_integration.repo_matches(repo.get('repositoryName', '')) for repo in suppression['repositories']) + return False + def _get_cve_suppression_path(self, suppression: dict[str, Any]) -> str: suppression_path: str = align_path(suppression['cves'][0]['id']) # for handling cases of IR/docker (e.g: '/Dockerfile:/DockerFile.FROM) @@ -231,12 +300,12 @@ def pre_runner(self, runner: _BaseRunner) -> None: # not used pass - def get_policy_level_suppressions(self) -> dict[str, str]: + def get_policy_level_suppressions(self) -> dict[str, list[str]]: policy_level_suppressions = {} - for check_suppressions in self.suppressions.values(): + for check_suppressions in itertools.chain(self.suppressions.values(), self.suppressions_v2.values()): for suppression in check_suppressions: - if suppression.get("suppressionType") == "Policy": - policy_level_suppressions[suppression['id']] = suppression['policyId'] + if (suppression['isV1'] and suppression.get("suppressionType") == "Policy") or (not suppression['isV1'] and suppression.get("ruleType") == "policy"): + policy_level_suppressions[suppression['id']] = [suppression['policyId']] if suppression['isV1'] else suppression['policyIds'] break return policy_level_suppressions diff --git a/checkov/runner_filter.py b/checkov/runner_filter.py index 0661b39aa54..83911694936 100644 --- a/checkov/runner_filter.py +++ b/checkov/runner_filter.py @@ -396,9 +396,10 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter: show_progress_bar, run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan) return runner_filter - def set_suppressed_policies(self, policy_level_suppressions: List[str]) -> None: + def set_suppressed_policies(self, policy_level_suppressions: List[List[str]]) -> None: logging.debug(f"Received the following policy-level suppressions, that will be skipped from running: {policy_level_suppressions}") - self.suppressed_policies = policy_level_suppressions + # flatten + self.suppressed_policies = [suppression for suppression_list in policy_level_suppressions for suppression in suppression_list] @staticmethod def get_sast_languages(frameworks: Optional[List[str]], skip_framework: Optional[List[str]]) -> Set[SastLanguages]: diff --git a/checkov/sast/engines/prisma_engine.py b/checkov/sast/engines/prisma_engine.py index a9fdbe36fa1..7d0b0c4c0eb 100644 --- a/checkov/sast/engines/prisma_engine.py +++ b/checkov/sast/engines/prisma_engine.py @@ -14,6 +14,7 @@ from checkov.common.bridgecrew.check_type import CheckType from checkov.common.bridgecrew.platform_integration import bc_integration +from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as policy_metadata_integration from checkov.common.bridgecrew.platform_key import bridgecrew_dir from checkov.common.bridgecrew.severities import get_severity, Severity, Severities, BcSeverities from checkov.common.models.enums import CheckResult @@ -98,6 +99,7 @@ def get_reports(self, targets: List[str], registry: Registry, languages: Set[Sas 'skip_checks': registry.runner_filter.skip_checks if registry.runner_filter else [], 'check_threshold': check_threshold, 'skip_check_threshold': skip_check_threshold, + 'platform_check_metadata': policy_metadata_integration.sast_check_metadata or {}, 'skip_path': skip_paths, 'report_imports': registry.runner_filter.report_sast_imports if registry.runner_filter else False, 'remove_default_policies': registry.runner_filter.remove_default_sast_policies if registry.runner_filter else False, @@ -201,6 +203,7 @@ def run_go_library(self, languages: Set[SastLanguages], skip_path: List[str], check_threshold: Severity, skip_check_threshold: Severity, + platform_check_metadata: Dict[str, Any], cdk_languages: List[CDKLanguages], list_policies: bool = False, report_imports: bool = True, @@ -230,6 +233,7 @@ def run_go_library(self, languages: Set[SastLanguages], "skip_path": skip_path, "check_threshold": str(check_threshold), "skip_check_threshold": str(skip_check_threshold), + "platform_check_metadata": platform_check_metadata, "list_policies": list_policies, "report_imports": report_imports, "remove_default_policies": remove_default_policies, @@ -474,6 +478,7 @@ def get_policies(self, languages: Set[SastLanguages]) -> SastPolicies: 'skip_checks': [], 'check_threshold': Severities[BcSeverities.NONE], 'skip_check_threshold': Severities[BcSeverities.NONE], + 'platform_check_metadata': policy_metadata_integration.sast_check_metadata, 'skip_path': [], 'report_imports': False, 'report_reachability': False, diff --git a/checkov/sast/prisma_models/library_input.py b/checkov/sast/prisma_models/library_input.py index c4ba4e6b0ac..2780f4f8623 100644 --- a/checkov/sast/prisma_models/library_input.py +++ b/checkov/sast/prisma_models/library_input.py @@ -1,4 +1,4 @@ -from typing import Set, List +from typing import Set, List, Dict, Any from checkov.common.bridgecrew.severities import Severity from checkov.common.sast.consts import SastLanguages, CDKLanguages @@ -18,6 +18,7 @@ class LibraryInput(TypedDict): skip_path: List[str] check_threshold: Severity skip_check_threshold: Severity + platform_check_metadata: Dict[str, Any] list_policies: NotRequired[bool] report_imports: bool remove_default_policies: NotRequired[bool] diff --git a/tests/common/integration_features/test_policy_metadata_integration.py b/tests/common/integration_features/test_policy_metadata_integration.py index d85f0ca2d8f..d584a941303 100644 --- a/tests/common/integration_features/test_policy_metadata_integration.py +++ b/tests/common/integration_features/test_policy_metadata_integration.py @@ -18,9 +18,10 @@ def test_filtered_policy_ids(self): metadata_integration.bc_integration = instance metadata_integration.pre_scan() metadata_integration.pc_to_ckv_id_mapping - self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53'}) + self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53', '1234': 'CKV3_SAST_123'}) self.assertListEqual(metadata_integration.filtered_policy_ids, ['CKV_AWS_212', '806079891421835264_AZR_1685557908904', 'CKV_AWS_40', 'CKV_AWS_53', 'CKV_AZURE_122']) self.assertListEqual(metadata_integration.filtered_exception_policy_ids, ['CKV_AWS_212']) + self.assertSetEqual(set(metadata_integration.sast_check_metadata.keys()), {'CKV3_SAST_123'}) def mock_customer_run_config(): @@ -89,6 +90,22 @@ def mock_customer_run_config(): "3dc2478c-bf25-4383-aaa1-30feb5cda586" ], "benchmarks": {} + }, + "CKV3_SAST_123": { + "id": "BC_SAST_123", + "title": "sast", + "guideline": "https://docs.bridgecrew.io/docs/abc", + "severity": "LOW", + "pcSeverity": "LOW", + "category": "Networking", + "checkovId": "CKV3_SAST_123", + "constructiveTitle": "sast", + "descriptiveTitle": "sast", + "pcPolicyId": "1234", + "additionalPcPolicyIds": [ + "1234" + ], + "benchmarks": {} } }, "customPolicies": [ diff --git a/tests/common/integration_features/test_suppressions_integration.py b/tests/common/integration_features/test_suppressions_integration.py index c5c00cb994d..2bae612f052 100644 --- a/tests/common/integration_features/test_suppressions_integration.py +++ b/tests/common/integration_features/test_suppressions_integration.py @@ -226,6 +226,37 @@ def test_policy_suppression(self): self.assertTrue(suppressions_integration._check_suppression(record1, suppression)) self.assertFalse(suppressions_integration._check_suppression(record2, suppression)) + def test_policy_v2_suppression(self): + instance = BcPlatformIntegration() + + suppressions_integration = SuppressionsIntegration(instance) + suppressions_integration._init_repo_regex() + + suppression = { + "ruleType": "policy", + "checkovPolicyIds": ["CKV_AWS_79", "CKV_AWS_80"], + } + + record1 = Record(check_id='CKV_AWS_79', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record2 = Record(check_id='CKV_AWS_80', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record3 = Record(check_id='CKV_AWS_1', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + + self.assertTrue(suppressions_integration._check_suppression_v2(record1, suppression)) + self.assertTrue(suppressions_integration._check_suppression_v2(record2, suppression)) + self.assertFalse(suppressions_integration._check_suppression_v2(record3, suppression)) + def test_suppress_by_policy_BC_VUL_2(self): instance = BcPlatformIntegration() @@ -818,6 +849,45 @@ def test_account_suppression(self): self.assertTrue(suppressions_integration._check_suppression(record1, suppression)) self.assertFalse(suppressions_integration._check_suppression(record2, suppression)) + def test_repo_v2_suppression(self): + instance = BcPlatformIntegration() + instance.repo_id = 'org/repo' + suppressions_integration = SuppressionsIntegration(instance) + suppressions_integration._init_repo_regex() + suppression = { + "ruleType": "repository", + "repositories": [ + {"repositoryName": "org/repo"}, + {"repositoryName": "not/valid"} + ], + "checkovPolicyIds": ["CKV_AWS_18", "CKV_AWS_19"], + } + + # this is actually almost the same as a policy check, except we care about the repo name in the integration + # record details do not matter, except policy ID + record1 = Record(check_id='CKV_AWS_18', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record2 = Record(check_id='CKV_AWS_19', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record3 = Record(check_id='CKV_AWS_1', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + + self.assertTrue(suppressions_integration._check_suppression_v2(record1, suppression)) + self.assertTrue(suppressions_integration._check_suppression_v2(record2, suppression)) + self.assertFalse(suppressions_integration._check_suppression_v2(record3, suppression)) + + instance.repo_id = 'another/repo' + self.assertFalse(suppressions_integration._check_suppression_v2(record1, suppression)) + def test_account_suppression_cli_repo(self): instance = BcPlatformIntegration() instance.repo_id = 'org/repo' @@ -845,6 +915,45 @@ def test_account_suppression_cli_repo(self): self.assertTrue(suppressions_integration._check_suppression(record1, suppression)) self.assertFalse(suppressions_integration._check_suppression(record2, suppression)) + def test_repo_v2_suppression_cli_repo(self): + instance = BcPlatformIntegration() + instance.repo_id = 'org/repo' + suppressions_integration = SuppressionsIntegration(instance) + suppressions_integration._init_repo_regex() + suppression = { + "ruleType": "repository", + "repositories": [ + {"repositoryName": "1234_org/repo"}, + {"repositoryName": "1234_not/valid"} + ], + "checkovPolicyIds": ["CKV_AWS_18", "CKV_AWS_19"], + } + + # this is actually almost the same as a policy check, except we care about the repo name in the integration + # record details do not matter, except policy ID + record1 = Record(check_id='CKV_AWS_18', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record2 = Record(check_id='CKV_AWS_19', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record3 = Record(check_id='CKV_AWS_1', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + + self.assertTrue(suppressions_integration._check_suppression_v2(record1, suppression)) + self.assertTrue(suppressions_integration._check_suppression_v2(record2, suppression)) + self.assertFalse(suppressions_integration._check_suppression_v2(record3, suppression)) + + instance.repo_id = 'another/repo' + self.assertFalse(suppressions_integration._check_suppression_v2(record1, suppression)) + def test_resource_suppression(self): instance = BcPlatformIntegration() instance.repo_id = 'org/repo' @@ -1013,6 +1122,71 @@ def test_tag_suppression(self): self.assertFalse(suppressions_integration._check_suppression(record4, suppression)) self.assertFalse(suppressions_integration._check_suppression(record5, suppression)) + def test_file_v2_suppression_cli_repo(self): + instance = BcPlatformIntegration() + instance.repo_id = 'org/repo' + suppressions_integration = SuppressionsIntegration(instance) + suppressions_integration._init_repo_regex() + suppression = { + "ruleType": "file", + "files": [ + { + "repositoryName": "1234_org/repo", + "filePath": "test/file.txt" + }, + { + "repositoryName": "1234_org/repo2", + "filePath": "/test/file2.txt" + }, + { + "repositoryName": "1234_not/valid", + "filePath": "/test/file3.txt" + } + ], + "checkovPolicyIds": ["CKV_AWS_18", "CKV_AWS_19"], + } + + # this is actually almost the same as a policy check, except we care about the repo name in the integration + # record details do not matter, except policy ID + record1 = Record(check_id='CKV_AWS_18', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record1.repo_file_path = '/test/file.txt' + record2 = Record(check_id='CKV_AWS_19', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record2.repo_file_path = 'test/file.txt' # should still match despite missing slash + record3 = Record(check_id='CKV_AWS_18', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record3.repo_file_path = '/test/file2.txt' + record4 = Record(check_id='CKV_AWS_1', check_name=None, check_result=None, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record4.repo_file_path = 'test/file.txt' + + self.assertTrue(suppressions_integration._check_suppression_v2(record1, suppression)) + self.assertTrue(suppressions_integration._check_suppression_v2(record2, suppression)) + self.assertFalse(suppressions_integration._check_suppression_v2(record3, suppression)) # right file, wrong repo + self.assertFalse(suppressions_integration._check_suppression_v2(record4, suppression)) + + record1.repo_file_path = '/test/file2.txt' + record2.repo_file_path = 'test/file2.txt' + instance.repo_id = 'org/repo2' # now check the same thing but with a leading slash in the suppression file + self.assertTrue(suppressions_integration._check_suppression_v2(record1, suppression)) + self.assertTrue(suppressions_integration._check_suppression_v2(record2, suppression)) + + instance.repo_id = 'another/repo' + self.assertFalse(suppressions_integration._check_suppression_v2(record1, suppression)) + def test_apply_suppressions_to_report(self): instance = BcPlatformIntegration() @@ -1024,6 +1198,7 @@ def test_apply_suppressions_to_report(self): "policyId": "BC_AWS_GENERAL_31", "comment": "No justification comment provided.", "checkovPolicyId": "CKV_AWS_79", + "isV1": True } suppressions_integration.suppressions = {suppression['checkovPolicyId']: [suppression]} @@ -1065,44 +1240,126 @@ def test_apply_suppressions_to_report(self): self.assertEqual(len(report.passed_checks), 1) self.assertEqual(report.passed_checks[0].check_id, 'CKV_AWS_2') self.assertEqual(len(report.skipped_checks), 2) + self.assertEqual(report.skipped_checks[0].check_result['suppress_comment'], "No justification comment provided.") + + def test_apply_suppressions_to_report_with_v2(self): + instance = BcPlatformIntegration() + + suppressions_integration = SuppressionsIntegration(instance) + + suppression = { + "ruleType": "policy", + "checkovPolicyIds": ["CKV_AWS_79", "CKV_AWS_80"], + "isV1": False, + "justificationComment": "comment" + } + + suppressions_integration.suppressions_v2 = {id: [suppression] for id in suppression['checkovPolicyIds']} + + record1 = Record(check_id='CKV_AWS_79', check_name=None, + check_result={'result': CheckResult.FAILED, 'evaluated_keys': ['multi_az']}, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record2 = Record(check_id='CKV_AWS_1', check_name=None, + check_result={'result': CheckResult.FAILED, 'evaluated_keys': ['multi_az']}, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record3 = Record(check_id='CKV_AWS_80', check_name=None, + check_result={'result': CheckResult.PASSED, 'evaluated_keys': ['multi_az']}, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + record4 = Record(check_id='CKV_AWS_2', check_name=None, + check_result={'result': CheckResult.PASSED, 'evaluated_keys': ['multi_az']}, + code_block=None, file_path=None, + file_line_range=None, + resource=None, evaluations=None, + check_class=None, file_abs_path='.', entity_tags=None) + + report = Report('terraform') + report.add_record(record1) + report.add_record(record2) + report.add_record(record3) + report.add_record(record4) + + suppressions_integration._apply_suppressions_to_report(report) + self.assertEqual(len(report.failed_checks), 1) + self.assertEqual(report.failed_checks[0].check_id, 'CKV_AWS_1') + self.assertEqual(len(report.passed_checks), 1) + self.assertEqual(report.passed_checks[0].check_id, 'CKV_AWS_2') + self.assertEqual(len(report.skipped_checks), 2) + self.assertEqual(report.skipped_checks[0].check_result['suppress_comment'], "comment") def test_get_policy_level_suppressions(self): instance = BcPlatformIntegration() suppressions_integration = SuppressionsIntegration(instance) suppressions_integration.suppressions = { - 'CKV_AWS_252': [{'suppressionType': 'Policy', 'id': '404088ed-4251-41ac-8dc1-45264af0c461', + 'CKV_AWS_252': [{'suppressionType': 'Policy', "isV1": True, 'id': '404088ed-4251-41ac-8dc1-45264af0c461', 'policyId': 'BC_AWS_GENERAL_175', 'creationDate': '2022-11-09T16:27:36.413Z', 'comment': 'Test2', 'checkovPolicyId': 'CKV_AWS_252'}], 'CKV_AWS_36': [ - {'suppressionType': 'Policy', 'id': 'b68013bc-2908-4c9a-969d-f1640d4aca11', + {'suppressionType': 'Policy', "isV1": True, 'id': 'b68013bc-2908-4c9a-969d-f1640d4aca11', 'policyId': 'BC_AWS_LOGGING_2', 'creationDate': '2022-11-09T16:11:58.435Z', 'comment': 'Testing', 'checkovPolicyId': 'CKV_AWS_36'}], 'CKV_K8S_27': [ - {'suppressionType': 'Policy', 'id': '271c1a79-2333-4a12-bf7d-55ec78468b94', 'policyId': 'BC_K8S_26', + {'suppressionType': 'Policy', "isV1": True, 'id': '271c1a79-2333-4a12-bf7d-55ec78468b94', 'policyId': 'BC_K8S_26', 'creationDate': '2022-12-08T08:00:04.561Z', 'comment': 'test checkov suppressions', 'checkovPolicyId': 'CKV_K8S_27'}], 'acme_AWS_1668010000289': [ - {'suppressionType': 'Resources', 'id': '5565e523-58da-4bc7-970e-c3fceef93ac1', + {'suppressionType': 'Resources', "isV1": True, 'id': '5565e523-58da-4bc7-970e-c3fceef93ac1', 'policyId': 'acme_AWS_1668010000289', 'creationDate': '2022-11-09T16:28:50.887Z', 'comment': 'Testing', 'resources': [{'accountId': 'acme_cli_repo/testing-resources', 'resourceId': '/src/BC_AWS_LOGGING_7.tf:aws_cloudtrail.cloudtrail9'}], 'checkovPolicyId': 'acme_AWS_1668010000289'}, - {'suppressionType': 'Resources', 'id': 'adf6f831-4393-4dcb-b345-2a14bf944267', + {'suppressionType': 'Resources', "isV1": True, 'id': 'adf6f831-4393-4dcb-b345-2a14bf944267', 'policyId': 'acme_AWS_1668010000289', 'creationDate': '2022-11-09T16:28:50.951Z', 'comment': 'Testing', 'resources': [{'accountId': 'acme_cli_repo/testing-resources', 'resourceId': '/src/BC_AWS_LOGGING_7.tf:aws_cloudtrail.cloudtrail10'}], 'checkovPolicyId': 'acme_AWS_1668010000289'}, - {'suppressionType': 'Resources', 'id': '86d88e69-5755-4e69-965b-f97fc26e784b', + {'suppressionType': 'Resources', "isV1": True, 'id': '86d88e69-5755-4e69-965b-f97fc26e784b', 'policyId': 'acme_AWS_1668010000289', 'creationDate': '2022-11-09T16:28:50.838Z', 'comment': 'Testing', 'resources': [{'accountId': 'acme_cli_repo/testing-resources', 'resourceId': '/src/BC_AWS_LOGGING_7.tf:aws_cloudtrail.cloudtrail8'}], 'checkovPolicyId': 'acme_AWS_1668010000289'}]} + suppressions_integration.suppressions_v2 = { + "CKV3_SAST_1": [{ + "ruleType": "policy", + "isV1": False, + "id": "1111", + "policyIds": ["BC_SAST_1", "BC_SAST_2"] + }], + "CKV3_SAST_2": [ + { + "ruleType": "policy", + "isV1": False, + "id": "2222", + "policyIds": ["BC_SAST_3", "BC_SAST_2"] + }, + { + "ruleType": "repository", + "isV1": False, + "id": "3333", + "policyIds": ["BC_SAST_1", "BC_SAST_3"] + } + ] + } + expected_suppressions = ['404088ed-4251-41ac-8dc1-45264af0c461', 'b68013bc-2908-4c9a-969d-f1640d4aca11', - '271c1a79-2333-4a12-bf7d-55ec78468b94'] + '271c1a79-2333-4a12-bf7d-55ec78468b94', '1111', '2222'] policy_level_suppressions = suppressions_integration.get_policy_level_suppressions() self.assertEqual(expected_suppressions, list(policy_level_suppressions.keys())) + self.assertEqual(policy_level_suppressions['404088ed-4251-41ac-8dc1-45264af0c461'], ['BC_AWS_GENERAL_175']) + self.assertEqual(policy_level_suppressions['b68013bc-2908-4c9a-969d-f1640d4aca11'], ['BC_AWS_LOGGING_2']) + self.assertEqual(policy_level_suppressions['271c1a79-2333-4a12-bf7d-55ec78468b94'], ['BC_K8S_26']) + self.assertEqual(policy_level_suppressions['1111'], ["BC_SAST_1", "BC_SAST_2"]) + self.assertEqual(policy_level_suppressions['2222'], ["BC_SAST_3", "BC_SAST_2"]) if __name__ == '__main__':