Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

platform(general): Handle SAST suppressions (suppressions V2) #6109

Merged
merged 17 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import itertools
import logging
import re
from itertools import groupby
from typing import TYPE_CHECKING, Pattern, Any
from typing import TYPE_CHECKING, Pattern, Any, Optional

from checkov.common.bridgecrew.check_type import CheckType

Expand All @@ -25,6 +26,7 @@
class SuppressionsIntegration(BaseIntegrationFeature):
def __init__(self, bc_integration: BcPlatformIntegration) -> None:
super().__init__(bc_integration=bc_integration, order=2) # must be after the custom policies integration
self.suppressions_v2: dict[str, list[dict[str, Any]]] = {}
self.suppressions: dict[str, list[dict[str, Any]]] = {}

# bcorgname_provider_timestamp (ex: companyxyz_aws_1234567891011)
Expand All @@ -51,26 +53,56 @@ def pre_scan(self) -> None:
return

suppressions = self.bc_integration.customer_run_config_response.get('suppressions')
suppressions_v2 = self.bc_integration.customer_run_config_response.get('suppressionsV2') # currently just SAST

for suppression in suppressions:
suppression['isV1'] = True
if suppression['policyId'] in metadata_integration.bc_to_ckv_id_mapping:
suppression['checkovPolicyId'] = metadata_integration.get_ckv_id_from_bc_id(suppression['policyId'])
else:
suppression['checkovPolicyId'] = suppression['policyId'] # custom policy

for suppression in suppressions_v2:
suppression['isV1'] = False
checkov_ids = []
for policy_id in suppression['policyIds']:
if policy_id in metadata_integration.bc_to_ckv_id_mapping:
checkov_ids.append(metadata_integration.bc_to_ckv_id_mapping[policy_id])
else:
checkov_ids.append(policy_id) # custom policy - not supported yet
suppression['checkovPolicyIds'] = checkov_ids

self._init_repo_regex()
suppressions = sorted(suppressions, key=lambda s: s['checkovPolicyId'])

# group and map by policy ID
self.suppressions = {policy_id: list(sup) for policy_id, sup in
groupby(suppressions, key=lambda s: s['checkovPolicyId'])}
logging.debug(f'Found {len(self.suppressions)} valid suppressions from the platform.')
logging.debug('The found suppression rules are:')

# map suppressions v2 by checkov ID - because the policy IDs are arrays, we need to map each unique ID in each
# suppression's policy ID array to its suppressions
self.suppressions_v2 = SuppressionsIntegration.create_suppression_v2_policy_id_map(suppressions_v2)

logging.debug('The found suppression v1 rules are:')
logging.debug(self.suppressions)
logging.debug('The found suppression v2 rules are:')
logging.debug(self.suppressions_v2)

except Exception:
self.integration_feature_failures = True
logging.debug("Scanning without applying suppressions configured in the platform.", exc_info=True)

@staticmethod
def create_suppression_v2_policy_id_map(suppressions_v2: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
checkov_id_map: dict[str, list[dict[str, Any]]] = {}
for suppression in suppressions_v2:
for checkov_id in suppression['checkovPolicyIds']:
if checkov_id in checkov_id_map:
checkov_id_map[checkov_id].append(suppression)
else:
checkov_id_map[checkov_id] = [suppression]
return checkov_id_map

def post_runner(self, scan_report: Report) -> None:
self._apply_suppressions_to_report(scan_report)

Expand All @@ -88,10 +120,13 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
check.check_id = 'BC_VUL_1'

relevant_suppressions = self.suppressions.get(check.check_id)
relevant_suppressions_v2 = self.suppressions_v2.get(check.check_id)

has_suppression = relevant_suppressions or relevant_suppressions_v2

applied_suppression = self._check_suppressions(check, relevant_suppressions) if relevant_suppressions else None
applied_suppression = self._check_suppressions(check, relevant_suppressions, relevant_suppressions_v2) if has_suppression else None
if applied_suppression:
suppress_comment = applied_suppression['comment']
suppress_comment = applied_suppression['comment'] if applied_suppression['isV1'] else applied_suppression['justificationComment']
logging.debug(f'Applying suppression to the check {check.check_id} with the comment: {suppress_comment}')
check.check_result = {
'result': CheckResult.SKIPPED,
Expand All @@ -106,17 +141,22 @@ def _apply_suppressions_to_report(self, scan_report: Report) -> None:
scan_report.failed_checks = still_failed_checks
scan_report.passed_checks = still_passed_checks

def _check_suppressions(self, record: Record, suppressions: list[dict[str, Any]]) -> dict[str, Any] | None:
def _check_suppressions(self, record: Record, suppressions: Optional[list[dict[str, Any]]], suppressions_v2: Optional[list[dict[str, Any]]]) -> dict[str, Any] | None:
"""
Checks the specified suppressions against the specified record, returning the first applicable suppression,
or None of no suppression is applicable.
Checks the specified suppressions against the specified record, returning a tuple of the applied suppression and whether
the suppression was a v1 suppression or not. If no suppression is found, then returns a tuple of None, None
mikeurbanski1 marked this conversation as resolved.
Show resolved Hide resolved
:param record:
:param suppressions:
:return:
"""
for suppression in suppressions:
if self._check_suppression(record, suppression):
return suppression
if suppressions:
for suppression in suppressions or []:
mikeurbanski1 marked this conversation as resolved.
Show resolved Hide resolved
if self._check_suppression(record, suppression):
return suppression
if suppressions_v2:
for suppression in suppressions_v2 or []:
if self._check_suppression_v2(record, suppression):
return suppression
return None

def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> bool:
Expand Down Expand Up @@ -186,6 +226,30 @@ def _check_suppression(self, record: Record, suppression: dict[str, Any]) -> boo

return False

def _check_suppression_v2(self, record: Record, suppression: dict[str, Any]) -> bool:
if record.check_id not in suppression['checkovPolicyIds']:
return False

type = suppression['ruleType']

if type == 'policy': # TODO policy suppression not supported via UI yet but we just need to set the correct type value here
# We just checked the policy ID above
return True
elif type == 'finding':
pass # TODO how to map them?
elif type == 'file':
record_file_path = record.repo_file_path if record.repo_file_path.startswith('/') else f'/{record.repo_file_path}'
for file_suppression in suppression['files']:
suppression_file_path = file_suppression['filePath']
suppression_file_path = suppression_file_path if suppression_file_path.startswith('/') else f'/{suppression_file_path}'
mikeurbanski1 marked this conversation as resolved.
Show resolved Hide resolved
if self.bc_integration.repo_matches(file_suppression['repositoryName']) \
and (suppression_file_path == record_file_path
or suppression_file_path == convert_to_unix_path(record_file_path)):
mikeurbanski1 marked this conversation as resolved.
Show resolved Hide resolved
return True
elif type == 'repository':
return any(self.bc_integration.repo_matches(repo['repositoryName']) for repo in suppression['repositories'])
return False

def _suppression_valid_for_run(self, suppression: dict[str, Any]) -> bool:
"""
Returns whether this suppression is valid. A suppression is NOT valid if:
Expand Down Expand Up @@ -220,12 +284,12 @@ def pre_runner(self, runner: _BaseRunner) -> None:
# not used
pass

def get_policy_level_suppressions(self) -> dict[str, str]:
def get_policy_level_suppressions(self) -> dict[str, list[str]]:
policy_level_suppressions = {}
for check_suppressions in self.suppressions.values():
for check_suppressions in itertools.chain(self.suppressions.values(), self.suppressions_v2.values()):
for suppression in check_suppressions:
if suppression.get("suppressionType") == "Policy":
policy_level_suppressions[suppression['id']] = suppression['policyId']
if (suppression['isV1'] and suppression.get("suppressionType") == "Policy") or (not suppression['isV1'] and suppression.get("ruleType") == "policy"):
policy_level_suppressions[suppression['id']] = [suppression['policyId']] if suppression['isV1'] else suppression['policyIds']
break
return policy_level_suppressions

Expand Down
5 changes: 3 additions & 2 deletions checkov/runner_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,10 @@ def from_dict(obj: Dict[str, Any]) -> RunnerFilter:
run_image_referencer, enable_secret_scan_all_files, block_list_secret_scan)
return runner_filter

def set_suppressed_policies(self, policy_level_suppressions: List[str]) -> None:
def set_suppressed_policies(self, policy_level_suppressions: List[List[str]]) -> None:
logging.debug(f"Received the following policy-level suppressions, that will be skipped from running: {policy_level_suppressions}")
self.suppressed_policies = policy_level_suppressions
# flatten
self.suppressed_policies = [suppression for suppression_list in policy_level_suppressions for suppression in suppression_list]

@staticmethod
def get_sast_languages(frameworks: Optional[List[str]], skip_framework: Optional[List[str]]) -> Set[SastLanguages]:
Expand Down
Loading
Loading