Skip to content

Commit

Permalink
platform(general): Handle SAST suppressions (suppressions V2) (bridge…
Browse files Browse the repository at this point in the history
…crewio#6109)

* add logic to handle the new suppression format

* log suppressions v2

* handle policy, repo, and file suppressions v2

* add tests and handle cloned policies

* fix typing

* add helper methods

* fix line indentation

* remove unnecessary null check

* pass prisma severity to sast core

* use get to handle old suppressions

* remove extra iteration of values
  • Loading branch information
mikeurbanski1 authored May 9, 2024
1 parent 3e985c5 commit 4cc9f25
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class PolicyMetadataIntegration(BaseIntegrationFeature):
def __init__(self, bc_integration: BcPlatformIntegration) -> None:
super().__init__(bc_integration=bc_integration, order=0)
self.check_metadata: dict[str, Any] = {}
self.sast_check_metadata: dict[str, Any] = {}
self.bc_to_ckv_id_mapping: dict[str, str] = {}
self.pc_to_ckv_id_mapping: dict[str, str] = {}
self.ckv_id_to_source_incident_id_mapping: dict[str, str] = {}
Expand Down Expand Up @@ -145,6 +146,8 @@ def _handle_public_metadata(self, check_metadata: dict[str, Any]) -> None:
def _handle_customer_run_config(self, run_config: dict[str, Any]) -> None:
self.check_metadata = run_config['policyMetadata']
for ckv_id, pol in self.check_metadata.items():
if 'SAST' in ckv_id:
self.sast_check_metadata[ckv_id] = pol
self.bc_to_ckv_id_mapping[pol['id']] = ckv_id
if self.bc_integration.is_prisma_integration() and pol.get('pcPolicyId'):
self.pc_to_ckv_id_mapping[pol['pcPolicyId']] = ckv_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import itertools
import logging
import re
from itertools import groupby
from typing import TYPE_CHECKING, Pattern, Any
from typing import TYPE_CHECKING, Pattern, Any, Optional

from checkov.common.bridgecrew.check_type import CheckType

Expand All @@ -26,6 +27,7 @@
class SuppressionsIntegration(BaseIntegrationFeature):
def __init__(self, bc_integration: BcPlatformIntegration) -> None:
super().__init__(bc_integration=bc_integration, order=2) # must be after the custom policies integration
self.suppressions_v2: dict[str, list[dict[str, Any]]] = {}
self.suppressions: dict[str, list[dict[str, Any]]] = {}

# bcorgname_provider_timestamp (ex: companyxyz_aws_1234567891011)
Expand All @@ -52,26 +54,56 @@ def pre_scan(self) -> None:
return

suppressions = self.bc_integration.customer_run_config_response.get('suppressions')
suppressions_v2 = self.bc_integration.customer_run_config_response.get('suppressionsV2') # currently just SAST

for suppression in suppressions:
suppression['isV1'] = True
if suppression['policyId'] in metadata_integration.bc_to_ckv_id_mapping:
suppression['checkovPolicyId'] = metadata_integration.get_ckv_id_from_bc_id(suppression['policyId'])
else:
suppression['checkovPolicyId'] = suppression['policyId'] # custom policy

for suppression in suppressions_v2:
suppression['isV1'] = False
checkov_ids = []
for policy_id in suppression['policyIds']:
if policy_id in metadata_integration.bc_to_ckv_id_mapping:
checkov_ids.append(metadata_integration.bc_to_ckv_id_mapping[policy_id])
else:
checkov_ids.append(policy_id) # custom policy - not supported yet
suppression['checkovPolicyIds'] = checkov_ids

self._init_repo_regex()
suppressions = sorted(suppressions, key=lambda s: s['checkovPolicyId'])

# group and map by policy ID
self.suppressions = {policy_id: list(sup) for policy_id, sup in
groupby(suppressions, key=lambda s: s['checkovPolicyId'])}
logging.debug(f'Found {len(self.suppressions)} valid suppressions from the platform.')
logging.debug('The found suppression rules are:')

# map suppressions v2 by checkov ID - because the policy IDs are arrays, we need to map each unique ID in each
# suppression's policy ID array to its suppressions
self.suppressions_v2 = SuppressionsIntegration.create_suppression_v2_policy_id_map(suppressions_v2)

logging.debug('The found suppression v1 rules are:')
logging.debug(self.suppressions)
logging.debug('The found suppression v2 rules are:')
logging.debug(self.suppressions_v2)

except Exception:
self.integration_feature_failures = True
logging.debug("Scanning without applying suppressions configured in the platform.", exc_info=True)

@staticmethod
def create_suppression_v2_policy_id_map(suppressions_v2: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
checkov_id_map: dict[str, list[dict[str, Any]]] = {}
for suppression in suppressions_v2:
for checkov_id in suppression['checkovPolicyIds']:
if checkov_id in checkov_id_map:
checkov_id_map[checkov_id].append(suppression)
else:
checkov_id_map[checkov_id] = [suppression]
return checkov_id_map

def post_runner(self, scan_report: Report) -> None:
self._apply_suppressions_to_report(scan_report)

Expand All @@ -89,10 +121,13 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
check.check_id = 'BC_VUL_1'

relevant_suppressions = self.suppressions.get(check.check_id)
relevant_suppressions_v2 = self.suppressions_v2.get(check.check_id)

has_suppression = relevant_suppressions or relevant_suppressions_v2

applied_suppression = self._check_suppressions(check, relevant_suppressions) if relevant_suppressions else None
applied_suppression = self._check_suppressions(check, relevant_suppressions, relevant_suppressions_v2) if has_suppression else None
if applied_suppression:
suppress_comment = applied_suppression['comment']
suppress_comment = applied_suppression['comment'] if applied_suppression['isV1'] else applied_suppression['justificationComment']
logging.debug(f'Applying suppression to the check {check.check_id} with the comment: {suppress_comment}')
check.check_result = {
'result': CheckResult.SKIPPED,
Expand All @@ -107,17 +142,19 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
scan_report.failed_checks = still_failed_checks
scan_report.passed_checks = still_passed_checks

def _check_suppressions(self, record: Record, suppressions: list[dict[str, Any]]) -> dict[str, Any] | None:
def _check_suppressions(self, record: Record, suppressions: Optional[list[dict[str, Any]]], suppressions_v2: Optional[list[dict[str, Any]]]) -> dict[str, Any] | None:
"""
Checks the specified suppressions against the specified record, returning the first applicable suppression,
or None of no suppression is applicable.
:param record:
:param suppressions:
Checks the specified suppressions against the specified record, returning the applied suppression, if any, else None
:return:
"""
for suppression in suppressions:
if self._check_suppression(record, suppression):
return suppression
if suppressions:
for suppression in suppressions:
if self._check_suppression(record, suppression):
return suppression
if suppressions_v2:
for suppression in suppressions_v2:
if self._check_suppression_v2(record, suppression):
return suppression
return None

def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> bool:
Expand Down Expand Up @@ -189,6 +226,38 @@ def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> boo

return False

@staticmethod
def normalize_file_path(file_path: str) -> str:
"""
Returns the file path with a leading slash, if not already present
"""
return file_path if file_path.startswith('/') else f'/{file_path}'

def _check_suppression_v2_file(self, record_file_path: str, suppression_file_path: str, suppression_repo_name: str) -> bool:
return self.bc_integration.repo_matches(suppression_repo_name)\
and (suppression_file_path == record_file_path or suppression_file_path == convert_to_unix_path(record_file_path))

def _check_suppression_v2(self, record: Record, suppression: dict[str, Any]) -> bool:
if record.check_id not in suppression['checkovPolicyIds']:
return False

type = suppression['ruleType']

if type == 'policy':
# We just checked the policy ID above
return True
elif type == 'finding':
pass # TODO how to map them?
elif type == 'file':
record_file_path = SuppressionsIntegration.normalize_file_path(record.repo_file_path)
for file_suppression in suppression['files']:
suppression_file_path = SuppressionsIntegration.normalize_file_path(file_suppression['filePath'])
if self._check_suppression_v2_file(record_file_path, suppression_file_path, file_suppression.get('repositoryName', '')):
return True
elif type == 'repository':
return any(self.bc_integration.repo_matches(repo.get('repositoryName', '')) for repo in suppression['repositories'])
return False

def _get_cve_suppression_path(self, suppression: dict[str, Any]) -> str:
suppression_path: str = align_path(suppression['cves'][0]['id'])
# for handling cases of IR/docker (e.g: '/Dockerfile:/DockerFile.FROM)
Expand Down Expand Up @@ -231,12 +300,12 @@ def pre_runner(self, runner: _BaseRunner) -> None:
# not used
pass

def get_policy_level_suppressions(self) -> dict[str, str]:
def get_policy_level_suppressions(self) -> dict[str, list[str]]:
policy_level_suppressions = {}
for check_suppressions in self.suppressions.values():
for check_suppressions in itertools.chain(self.suppressions.values(), self.suppressions_v2.values()):
for suppression in check_suppressions:
if suppression.get("suppressionType") == "Policy":
policy_level_suppressions[suppression['id']] = suppression['policyId']
if (suppression['isV1'] and suppression.get("suppressionType") == "Policy") or (not suppression['isV1'] and suppression.get("ruleType") == "policy"):
policy_level_suppressions[suppression['id']] = [suppression['policyId']] if suppression['isV1'] else suppression['policyIds']
break
return policy_level_suppressions

Expand Down
5 changes: 3 additions & 2 deletions checkov/runner_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,10 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter:
show_progress_bar, run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan)
return runner_filter

def set_suppressed_policies(self, policy_level_suppressions: List[str]) -> None:
def set_suppressed_policies(self, policy_level_suppressions: List[List[str]]) -> None:
logging.debug(f"Received the following policy-level suppressions, that will be skipped from running: {policy_level_suppressions}")
self.suppressed_policies = policy_level_suppressions
# flatten
self.suppressed_policies = [suppression for suppression_list in policy_level_suppressions for suppression in suppression_list]

@staticmethod
def get_sast_languages(frameworks: Optional[List[str]], skip_framework: Optional[List[str]]) -> Set[SastLanguages]:
Expand Down
5 changes: 5 additions & 0 deletions checkov/sast/engines/prisma_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from checkov.common.bridgecrew.check_type import CheckType
from checkov.common.bridgecrew.platform_integration import bc_integration
from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as policy_metadata_integration
from checkov.common.bridgecrew.platform_key import bridgecrew_dir
from checkov.common.bridgecrew.severities import get_severity, Severity, Severities, BcSeverities
from checkov.common.models.enums import CheckResult
Expand Down Expand Up @@ -98,6 +99,7 @@ def get_reports(self, targets: List[str], registry: Registry, languages: Set[Sas
'skip_checks': registry.runner_filter.skip_checks if registry.runner_filter else [],
'check_threshold': check_threshold,
'skip_check_threshold': skip_check_threshold,
'platform_check_metadata': policy_metadata_integration.sast_check_metadata or {},
'skip_path': skip_paths,
'report_imports': registry.runner_filter.report_sast_imports if registry.runner_filter else False,
'remove_default_policies': registry.runner_filter.remove_default_sast_policies if registry.runner_filter else False,
Expand Down Expand Up @@ -201,6 +203,7 @@ def run_go_library(self, languages: Set[SastLanguages],
skip_path: List[str],
check_threshold: Severity,
skip_check_threshold: Severity,
platform_check_metadata: Dict[str, Any],
cdk_languages: List[CDKLanguages],
list_policies: bool = False,
report_imports: bool = True,
Expand Down Expand Up @@ -230,6 +233,7 @@ def run_go_library(self, languages: Set[SastLanguages],
"skip_path": skip_path,
"check_threshold": str(check_threshold),
"skip_check_threshold": str(skip_check_threshold),
"platform_check_metadata": platform_check_metadata,
"list_policies": list_policies,
"report_imports": report_imports,
"remove_default_policies": remove_default_policies,
Expand Down Expand Up @@ -474,6 +478,7 @@ def get_policies(self, languages: Set[SastLanguages]) -> SastPolicies:
'skip_checks': [],
'check_threshold': Severities[BcSeverities.NONE],
'skip_check_threshold': Severities[BcSeverities.NONE],
'platform_check_metadata': policy_metadata_integration.sast_check_metadata,
'skip_path': [],
'report_imports': False,
'report_reachability': False,
Expand Down
3 changes: 2 additions & 1 deletion checkov/sast/prisma_models/library_input.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set, List
from typing import Set, List, Dict, Any

from checkov.common.bridgecrew.severities import Severity
from checkov.common.sast.consts import SastLanguages, CDKLanguages
Expand All @@ -18,6 +18,7 @@ class LibraryInput(TypedDict):
skip_path: List[str]
check_threshold: Severity
skip_check_threshold: Severity
platform_check_metadata: Dict[str, Any]
list_policies: NotRequired[bool]
report_imports: bool
remove_default_policies: NotRequired[bool]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ def test_filtered_policy_ids(self):
metadata_integration.bc_integration = instance
metadata_integration.pre_scan()
metadata_integration.pc_to_ckv_id_mapping
self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53'})
self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53', '1234': 'CKV3_SAST_123'})
self.assertListEqual(metadata_integration.filtered_policy_ids, ['CKV_AWS_212', '806079891421835264_AZR_1685557908904', 'CKV_AWS_40', 'CKV_AWS_53', 'CKV_AZURE_122'])
self.assertListEqual(metadata_integration.filtered_exception_policy_ids, ['CKV_AWS_212'])
self.assertSetEqual(set(metadata_integration.sast_check_metadata.keys()), {'CKV3_SAST_123'})


def mock_customer_run_config():
Expand Down Expand Up @@ -89,6 +90,22 @@ def mock_customer_run_config():
"3dc2478c-bf25-4383-aaa1-30feb5cda586"
],
"benchmarks": {}
},
"CKV3_SAST_123": {
"id": "BC_SAST_123",
"title": "sast",
"guideline": "https://docs.bridgecrew.io/docs/abc",
"severity": "LOW",
"pcSeverity": "LOW",
"category": "Networking",
"checkovId": "CKV3_SAST_123",
"constructiveTitle": "sast",
"descriptiveTitle": "sast",
"pcPolicyId": "1234",
"additionalPcPolicyIds": [
"1234"
],
"benchmarks": {}
}
},
"customPolicies": [
Expand Down
Loading

0 comments on commit 4cc9f25

Please sign in to comment.