Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

platform(general): Handle SAST suppressions (suppressions V2) #6109

Merged
merged 17 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class PolicyMetadataIntegration(BaseIntegrationFeature):
def __init__(self, bc_integration: BcPlatformIntegration) -> None:
super().__init__(bc_integration=bc_integration, order=0)
self.check_metadata: dict[str, Any] = {}
self.sast_check_metadata: dict[str, Any] = {}
self.bc_to_ckv_id_mapping: dict[str, str] = {}
self.pc_to_ckv_id_mapping: dict[str, str] = {}
self.ckv_id_to_source_incident_id_mapping: dict[str, str] = {}
Expand Down Expand Up @@ -145,6 +146,8 @@ def _handle_public_metadata(self, check_metadata: dict[str, Any]) -> None:
def _handle_customer_run_config(self, run_config: dict[str, Any]) -> None:
self.check_metadata = run_config['policyMetadata']
for ckv_id, pol in self.check_metadata.items():
if 'SAST' in ckv_id:
self.sast_check_metadata[ckv_id] = pol
self.bc_to_ckv_id_mapping[pol['id']] = ckv_id
if self.bc_integration.is_prisma_integration() and pol.get('pcPolicyId'):
self.pc_to_ckv_id_mapping[pol['pcPolicyId']] = ckv_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import itertools
import logging
import re
from itertools import groupby
from typing import TYPE_CHECKING, Pattern, Any
from typing import TYPE_CHECKING, Pattern, Any, Optional

from checkov.common.bridgecrew.check_type import CheckType

Expand All @@ -26,6 +27,7 @@
class SuppressionsIntegration(BaseIntegrationFeature):
def __init__(self, bc_integration: BcPlatformIntegration) -> None:
super().__init__(bc_integration=bc_integration, order=2) # must be after the custom policies integration
self.suppressions_v2: dict[str, list[dict[str, Any]]] = {}
self.suppressions: dict[str, list[dict[str, Any]]] = {}

# bcorgname_provider_timestamp (ex: companyxyz_aws_1234567891011)
Expand All @@ -52,26 +54,56 @@ def pre_scan(self) -> None:
return

suppressions = self.bc_integration.customer_run_config_response.get('suppressions')
suppressions_v2 = self.bc_integration.customer_run_config_response.get('suppressionsV2') # currently just SAST

for suppression in suppressions:
suppression['isV1'] = True
if suppression['policyId'] in metadata_integration.bc_to_ckv_id_mapping:
suppression['checkovPolicyId'] = metadata_integration.get_ckv_id_from_bc_id(suppression['policyId'])
else:
suppression['checkovPolicyId'] = suppression['policyId'] # custom policy

for suppression in suppressions_v2:
suppression['isV1'] = False
checkov_ids = []
for policy_id in suppression['policyIds']:
if policy_id in metadata_integration.bc_to_ckv_id_mapping:
checkov_ids.append(metadata_integration.bc_to_ckv_id_mapping[policy_id])
else:
checkov_ids.append(policy_id) # custom policy - not supported yet
suppression['checkovPolicyIds'] = checkov_ids

self._init_repo_regex()
suppressions = sorted(suppressions, key=lambda s: s['checkovPolicyId'])

# group and map by policy ID
self.suppressions = {policy_id: list(sup) for policy_id, sup in
groupby(suppressions, key=lambda s: s['checkovPolicyId'])}
logging.debug(f'Found {len(self.suppressions)} valid suppressions from the platform.')
logging.debug('The found suppression rules are:')

# map suppressions v2 by checkov ID - because the policy IDs are arrays, we need to map each unique ID in each
# suppression's policy ID array to its suppressions
self.suppressions_v2 = SuppressionsIntegration.create_suppression_v2_policy_id_map(suppressions_v2)

logging.debug('The found suppression v1 rules are:')
logging.debug(self.suppressions)
logging.debug('The found suppression v2 rules are:')
logging.debug(self.suppressions_v2)

except Exception:
self.integration_feature_failures = True
logging.debug("Scanning without applying suppressions configured in the platform.", exc_info=True)

@staticmethod
def create_suppression_v2_policy_id_map(suppressions_v2: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
checkov_id_map: dict[str, list[dict[str, Any]]] = {}
for suppression in suppressions_v2:
for checkov_id in suppression['checkovPolicyIds']:
if checkov_id in checkov_id_map:
checkov_id_map[checkov_id].append(suppression)
else:
checkov_id_map[checkov_id] = [suppression]
return checkov_id_map

def post_runner(self, scan_report: Report) -> None:
self._apply_suppressions_to_report(scan_report)

Expand All @@ -89,10 +121,13 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
check.check_id = 'BC_VUL_1'

relevant_suppressions = self.suppressions.get(check.check_id)
relevant_suppressions_v2 = self.suppressions_v2.get(check.check_id)

has_suppression = relevant_suppressions or relevant_suppressions_v2

applied_suppression = self._check_suppressions(check, relevant_suppressions) if relevant_suppressions else None
applied_suppression = self._check_suppressions(check, relevant_suppressions, relevant_suppressions_v2) if has_suppression else None
if applied_suppression:
suppress_comment = applied_suppression['comment']
suppress_comment = applied_suppression['comment'] if applied_suppression['isV1'] else applied_suppression['justificationComment']
logging.debug(f'Applying suppression to the check {check.check_id} with the comment: {suppress_comment}')
check.check_result = {
'result': CheckResult.SKIPPED,
Expand All @@ -107,17 +142,19 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
scan_report.failed_checks = still_failed_checks
scan_report.passed_checks = still_passed_checks

def _check_suppressions(self, record: Record, suppressions: list[dict[str, Any]]) -> dict[str, Any] | None:
def _check_suppressions(self, record: Record, suppressions: Optional[list[dict[str, Any]]], suppressions_v2: Optional[list[dict[str, Any]]]) -> dict[str, Any] | None:
"""
Checks the specified suppressions against the specified record, returning the first applicable suppression,
or None of no suppression is applicable.
:param record:
:param suppressions:
Checks the specified suppressions against the specified record, returning the applied suppression, if any, else None
:return:
"""
for suppression in suppressions:
if self._check_suppression(record, suppression):
return suppression
if suppressions:
for suppression in suppressions:
if self._check_suppression(record, suppression):
return suppression
if suppressions_v2:
for suppression in suppressions_v2:
if self._check_suppression_v2(record, suppression):
return suppression
return None

def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> bool:
Expand Down Expand Up @@ -189,6 +226,38 @@ def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> boo

return False

@staticmethod
def normalize_file_path(file_path: str) -> str:
"""
Returns the file path with a leading slash, if not already present
"""
return file_path if file_path.startswith('/') else f'/{file_path}'

def _check_suppression_v2_file(self, record_file_path: str, suppression_file_path: str, suppression_repo_name: str) -> bool:
return self.bc_integration.repo_matches(suppression_repo_name)\
and (suppression_file_path == record_file_path or suppression_file_path == convert_to_unix_path(record_file_path))

def _check_suppression_v2(self, record: Record, suppression: dict[str, Any]) -> bool:
if record.check_id not in suppression['checkovPolicyIds']:
return False

type = suppression['ruleType']

if type == 'policy':
# We just checked the policy ID above
return True
elif type == 'finding':
pass # TODO how to map them?
elif type == 'file':
record_file_path = SuppressionsIntegration.normalize_file_path(record.repo_file_path)
for file_suppression in suppression['files']:
suppression_file_path = SuppressionsIntegration.normalize_file_path(file_suppression['filePath'])
if self._check_suppression_v2_file(record_file_path, suppression_file_path, file_suppression.get('repositoryName', '')):
return True
elif type == 'repository':
return any(self.bc_integration.repo_matches(repo.get('repositoryName', '')) for repo in suppression['repositories'])
return False

def _get_cve_suppression_path(self, suppression: dict[str, Any]) -> str:
suppression_path: str = align_path(suppression['cves'][0]['id'])
# for handling cases of IR/docker (e.g: '/Dockerfile:/DockerFile.FROM)
Expand Down Expand Up @@ -231,12 +300,12 @@ def pre_runner(self, runner: _BaseRunner) -> None:
# not used
pass

def get_policy_level_suppressions(self) -> dict[str, str]:
def get_policy_level_suppressions(self) -> dict[str, list[str]]:
policy_level_suppressions = {}
for check_suppressions in self.suppressions.values():
for check_suppressions in itertools.chain(self.suppressions.values(), self.suppressions_v2.values()):
for suppression in check_suppressions:
if suppression.get("suppressionType") == "Policy":
policy_level_suppressions[suppression['id']] = suppression['policyId']
if (suppression['isV1'] and suppression.get("suppressionType") == "Policy") or (not suppression['isV1'] and suppression.get("ruleType") == "policy"):
policy_level_suppressions[suppression['id']] = [suppression['policyId']] if suppression['isV1'] else suppression['policyIds']
break
return policy_level_suppressions

Expand Down
5 changes: 3 additions & 2 deletions checkov/runner_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,10 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter:
show_progress_bar, run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan)
return runner_filter

def set_suppressed_policies(self, policy_level_suppressions: List[str]) -> None:
def set_suppressed_policies(self, policy_level_suppressions: List[List[str]]) -> None:
logging.debug(f"Received the following policy-level suppressions, that will be skipped from running: {policy_level_suppressions}")
self.suppressed_policies = policy_level_suppressions
# flatten
self.suppressed_policies = [suppression for suppression_list in policy_level_suppressions for suppression in suppression_list]

@staticmethod
def get_sast_languages(frameworks: Optional[List[str]], skip_framework: Optional[List[str]]) -> Set[SastLanguages]:
Expand Down
5 changes: 5 additions & 0 deletions checkov/sast/engines/prisma_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from checkov.common.bridgecrew.check_type import CheckType
from checkov.common.bridgecrew.platform_integration import bc_integration
from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as policy_metadata_integration
from checkov.common.bridgecrew.platform_key import bridgecrew_dir
from checkov.common.bridgecrew.severities import get_severity, Severity, Severities, BcSeverities
from checkov.common.models.enums import CheckResult
Expand Down Expand Up @@ -93,6 +94,7 @@ def get_reports(self, targets: List[str], registry: Registry, languages: Set[Sas
'check_threshold': check_threshold,
'skip_check_threshold': skip_check_threshold,
'skip_path': registry.runner_filter.excluded_paths if registry.runner_filter else [],
'platform_check_metadata': policy_metadata_integration.sast_check_metadata or {},
'report_imports': registry.runner_filter.report_sast_imports if registry.runner_filter else False,
'remove_default_policies': registry.runner_filter.remove_default_sast_policies if registry.runner_filter else False,
'report_reachability': registry.runner_filter.report_sast_reachability if registry.runner_filter else False,
Expand Down Expand Up @@ -195,6 +197,7 @@ def run_go_library(self, languages: Set[SastLanguages],
skip_path: List[str],
check_threshold: Severity,
skip_check_threshold: Severity,
platform_check_metadata: Dict[str, Any],
cdk_languages: List[CDKLanguages],
list_policies: bool = False,
report_imports: bool = True,
Expand Down Expand Up @@ -224,6 +227,7 @@ def run_go_library(self, languages: Set[SastLanguages],
"skip_path": skip_path,
"check_threshold": str(check_threshold),
"skip_check_threshold": str(skip_check_threshold),
"platform_check_metadata": platform_check_metadata,
"list_policies": list_policies,
"report_imports": report_imports,
"remove_default_policies": remove_default_policies,
Expand Down Expand Up @@ -468,6 +472,7 @@ def get_policies(self, languages: Set[SastLanguages]) -> SastPolicies:
'skip_checks': [],
'check_threshold': Severities[BcSeverities.NONE],
'skip_check_threshold': Severities[BcSeverities.NONE],
'platform_check_metadata': policy_metadata_integration.sast_check_metadata,
'skip_path': [],
'report_imports': False,
'report_reachability': False,
Expand Down
3 changes: 2 additions & 1 deletion checkov/sast/prisma_models/library_input.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set, List
from typing import Set, List, Dict, Any

from checkov.common.bridgecrew.severities import Severity
from checkov.common.sast.consts import SastLanguages, CDKLanguages
Expand All @@ -18,6 +18,7 @@ class LibraryInput(TypedDict):
skip_path: List[str]
check_threshold: Severity
skip_check_threshold: Severity
platform_check_metadata: Dict[str, Any]
list_policies: NotRequired[bool]
report_imports: bool
remove_default_policies: NotRequired[bool]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ def test_filtered_policy_ids(self):
metadata_integration.bc_integration = instance
metadata_integration.pre_scan()
metadata_integration.pc_to_ckv_id_mapping
self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53'})
self.assertDictEqual(metadata_integration.pc_to_ckv_id_mapping, {'6960be11-e3a6-46cc-bf66-933c57c2af5d': 'CKV_AWS_212', '3dc2478c-bf25-4383-aaa1-30feb5cda586': '806079891421835264_AZR_1685557908904', 'c11ce08c-b93e-4e11-8d1c-e5a1339139d1': 'CKV_AWS_40', '0e4c576e-c934-4af3-8592-a53920e71ffb': 'CKV_AWS_53', '1234': 'CKV3_SAST_123'})
self.assertListEqual(metadata_integration.filtered_policy_ids, ['CKV_AWS_212', '806079891421835264_AZR_1685557908904', 'CKV_AWS_40', 'CKV_AWS_53', 'CKV_AZURE_122'])
self.assertListEqual(metadata_integration.filtered_exception_policy_ids, ['CKV_AWS_212'])
self.assertSetEqual(set(metadata_integration.sast_check_metadata.keys()), {'CKV3_SAST_123'})


def mock_customer_run_config():
Expand Down Expand Up @@ -89,6 +90,22 @@ def mock_customer_run_config():
"3dc2478c-bf25-4383-aaa1-30feb5cda586"
],
"benchmarks": {}
},
"CKV3_SAST_123": {
"id": "BC_SAST_123",
"title": "sast",
"guideline": "https://docs.bridgecrew.io/docs/abc",
"severity": "LOW",
"pcSeverity": "LOW",
"category": "Networking",
"checkovId": "CKV3_SAST_123",
"constructiveTitle": "sast",
"descriptiveTitle": "sast",
"pcPolicyId": "1234",
"additionalPcPolicyIds": [
"1234"
],
"benchmarks": {}
}
},
"customPolicies": [
Expand Down
Loading
Loading