Skip to content

Commit

Permalink
feat(terraform_plan): Add PY graph checks for tf plan (#5875)
Browse files Browse the repository at this point in the history
* add graph checks for tf plan

* func

* mypy

* UT
  • Loading branch information
ChanochShayner authored Dec 19, 2023
1 parent 85a69e7 commit 06d1bd1
Show file tree
Hide file tree
Showing 5 changed files with 1,851 additions and 1 deletion.
10 changes: 10 additions & 0 deletions checkov/terraform/plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from typing_extensions import TypeAlias # noqa[TC002]

from checkov.common.checks.base_check_registry import BaseCheckRegistry
from checkov.common.graph.checks_infra.registry import BaseRegistry
from checkov.common.typing import LibraryGraphConnector, TFDefinitionKeyType
from checkov.common.graph.graph_builder.consts import GraphSource
Expand Down Expand Up @@ -239,6 +240,7 @@ def run_block(
entity_address = entity_context['address']
_, _, entity_config = registry.extract_entity_details(entity)

self._assign_graph_to_registry(registry)
results = registry.scan(scanned_file, entity, [], runner_filter, report_type=CheckType.TERRAFORM_PLAN)
for check, check_result in results.items():
if check.id in TF_LIFECYCLE_CHECK_IDS:
Expand Down Expand Up @@ -270,6 +272,14 @@ def run_block(
record.set_guideline(check.guideline)
report.add_record(record=record)

def _assign_graph_to_registry(self, registry: BaseCheckRegistry) -> None:
try:
registry.graph = self.graph_manager.db_connector.graph # type: ignore
except Exception as e:
logging.debug(f'fail to assign graph to the registry, err: {e}')
registry.graph = None
return

def get_entity_context_and_evaluations(self, entity: dict[str, Any]) -> dict[str, Any] | None:
if not self.context:
return None
Expand Down
Empty file.
30 changes: 30 additions & 0 deletions tests/terraform/runner/py_check_tf_plan/check_tf_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations
from typing import Any
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories, CheckResult


class JustForTest(BaseResourceValueCheck):
def __init__(self):
name = "Just for test (Like CKV2_GCP_18)"
id = "CKV_AWS_99999"
supported_resources = ['google_compute_network']
categories = [CheckCategories.ENCRYPTION]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return "storage_encrypted"

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
result = super().scan_resource_conf(conf=conf)
# For IGraph framework -
resources = self.graph.vs.select(block_type__eq="resource")["attr"]
# For RustworkX Framework - [g[1] for g in self.graph.nodes() if g[1].get('block_type_') == 'resource']

# Do something here.
if resources:
return CheckResult.PASSED
return result


check = JustForTest()
Loading

0 comments on commit 06d1bd1

Please sign in to comment.