diff --git a/checkov/common/bridgecrew/integration_features/features/custom_policies_integration.py b/checkov/common/bridgecrew/integration_features/features/custom_policies_integration.py index e0f7a67dcba..173c258cde8 100644 --- a/checkov/common/bridgecrew/integration_features/features/custom_policies_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/custom_policies_integration.py @@ -4,6 +4,7 @@ import logging import re from collections import defaultdict +import tempfile from typing import TYPE_CHECKING, Any, List from checkov.common.bridgecrew.integration_features.base_integration_feature import BaseIntegrationFeature @@ -21,6 +22,7 @@ # service-provider::service-name::data-type-name CFN_RESOURCE_TYPE_IDENTIFIER = re.compile(r"^[a-zA-Z0-9]+::[a-zA-Z0-9]+::[a-zA-Z0-9]+$") +SAST_CATEGORY = 'Sast' class CustomPoliciesIntegration(BaseIntegrationFeature): @@ -49,9 +51,16 @@ def pre_scan(self) -> None: return policies = self.bc_integration.customer_run_config_response.get('customPolicies') + sast_policies_dir = tempfile.mkdtemp() + self.bc_integration.sast_custom_policies = sast_policies_dir for policy in policies: try: logging.debug(f"Loading policy id: {policy.get('id')}") + if policy.get('category') == SAST_CATEGORY: + with open(f"{sast_policies_dir}/{policy.get('id')}.yaml", 'a') as f: + f.write(policy.get('code')) + continue + converted_check = self._convert_raw_check(policy) source_incident_id = policy.get('sourceIncidentId') if source_incident_id: diff --git a/checkov/common/bridgecrew/platform_integration.py b/checkov/common/bridgecrew/platform_integration.py index 853d5bf1b5b..dac76c5d63f 100644 --- a/checkov/common/bridgecrew/platform_integration.py +++ b/checkov/common/bridgecrew/platform_integration.py @@ -147,6 +147,7 @@ def clean(self) -> None: self.daemon_process = False # set to 'True' when running in multiprocessing 'spawn' mode self.scan_dir: List[str] = [] self.scan_file: List[str] = [] + self.sast_custom_policies: str = '' def init_instance(self, platform_integration_data: dict[str, Any]) -> None: """This is mainly used for recreating the instance without interacting with the platform again""" diff --git a/checkov/main.py b/checkov/main.py index ffba46f04b2..b535770ef7c 100755 --- a/checkov/main.py +++ b/checkov/main.py @@ -737,6 +737,10 @@ def get_external_checks_dir(self) -> list[str]: git_getter = GitGetter(url=self.config.external_checks_git[0]) external_checks_dir = [git_getter.get()] atexit.register(shutil.rmtree, str(Path(external_checks_dir[0]).parent)) + if bc_integration.sast_custom_policies: + if not external_checks_dir: + external_checks_dir = [] + external_checks_dir.append(bc_integration.sast_custom_policies) return external_checks_dir def upload_results(