Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Support for merge func inside jsondecode #5655

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions checkov/common/checks_infra/checks_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING
import logging
from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING, cast

from checkov.common.checks_infra.solvers import (
EqualsAttributeSolver,
Expand Down Expand Up @@ -53,6 +54,7 @@
NumberOfWordsGreaterThanOrEqualAttributeSolver,
NumberOfWordsLessThanAttributeSolver,
NumberOfWordsLessThanOrEqualAttributeSolver,
NotWithinAttributeSolver,
)
from checkov.common.checks_infra.solvers.connections_solvers.connection_one_exists_solver import \
ConnectionOneExistsSolver
Expand Down Expand Up @@ -80,6 +82,7 @@
"contains": ContainsAttributeSolver,
"not_exists": NotExistsAttributeSolver,
"within": WithinAttributeSolver,
"not_within": NotWithinAttributeSolver,
"not_contains": NotContainsAttributeSolver,
"starting_with": StartingWithAttributeSolver,
"not_starting_with": NotStartingWithAttributeSolver,
Expand Down Expand Up @@ -146,21 +149,56 @@
JSONPATH_PREFIX = "jsonpath_"


class NXGraphCheckParser(BaseGraphCheckParser):
class GraphCheckParser(BaseGraphCheckParser):
def validate_check_config(self, file_path: str, raw_check: dict[str, dict[str, Any]]) -> bool:
missing_fields = []

# check existence of metadata block
if "metadata" in raw_check:
metadata = raw_check["metadata"]
if "id" not in metadata:
missing_fields.append("metadata.id")
if "name" not in metadata:
missing_fields.append("metadata.name")
if "category" not in metadata:
missing_fields.append("metadata.category")
else:
missing_fields.extend(("metadata.id", "metadata.name", "metadata.category"))

# check existence of definition block
if "definition" not in raw_check:
missing_fields.append("definition")

if missing_fields:
logging.warning(f"Custom policy {file_path} is missing required fields {', '.join(missing_fields)}")
return False

# check if definition block is not obviously invalid
definition = raw_check["definition"]
if not isinstance(definition, (list, dict)):
logging.warning(
f"Custom policy {file_path} has an invalid 'definition' block type '{type(definition).__name__}', "
"needs to be either a 'list' or 'dict'"
)
return False

return True

def parse_raw_check(self, raw_check: Dict[str, Dict[str, Any]], **kwargs: Any) -> BaseGraphCheck:
policy_definition = raw_check.get("definition", {})
check = self._parse_raw_check(policy_definition, kwargs.get("resources_types"))
check = self._parse_raw_check(policy_definition, kwargs.get("resources_types"), raw_check)
check.id = raw_check.get("metadata", {}).get("id", "")
check.name = raw_check.get("metadata", {}).get("name", "")
check.category = raw_check.get("metadata", {}).get("category", "")
check.frameworks = raw_check.get("metadata", {}).get("frameworks", [])
check.guideline = raw_check.get("metadata", {}).get("guideline")
check.check_path = kwargs.get("check_path", "")
solver = self.get_check_solver(check)
check.set_solver(solver)

return check

def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]]) -> BaseGraphCheck:
def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]], json_check: Dict[str, Any]) -> BaseGraphCheck:
check = BaseGraphCheck()
complex_operator = get_complex_operator(raw_check)
if complex_operator:
Expand All @@ -174,7 +212,7 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[
sub_solvers = [sub_solvers]

for sub_solver in sub_solvers:
check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types))
check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types, json_check))
resources_types_of_sub_solvers = [
force_list(q.resource_types) for q in check.sub_checks if q is not None and q.resource_types is not None
]
Expand All @@ -190,6 +228,15 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[
or (isinstance(resource_type, list) and resource_type[0].lower() == "all")
):
check.resource_types = resources_types or []
elif "provider" in resource_type:
provider = json_check.get("scope", {}).get("provider", "")
provider_type = ""
if cast(str, provider):
provider_type = provider.lower()
elif cast(list, provider):
provider_type = provider[0].lower()
check.resource_types.append(f"provider.{provider_type}")

else:
check.resource_types = resource_type

Expand Down Expand Up @@ -251,6 +298,11 @@ def get_check_solver(self, check: BaseGraphCheck) -> BaseSolver:
return solver


class NXGraphCheckParser(GraphCheckParser):
# TODO: delete after downstream adjustments
pass


def get_complex_operator(raw_check: Dict[str, Any]) -> Optional[str]:
for operator in operators_to_complex_solver_classes.keys():
if raw_check.get(operator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,20 @@ def evaluate_terraform(input_str: Any, keep_interpolations: bool = True) -> Any:
elif not keep_interpolations and second_evaluated_value == value_after_removing_interpolations:
return value_before_removing_interpolations
else:
second_evaluated_value = _eval_merge_as_list(second_evaluated_value)
return second_evaluated_value


def _eval_merge_as_list(eval_value):
"""
Edge case for an eval in eval.
UT for this: test_jsonpath_equals_ecs_with_merge
"""
if eval_value and isinstance(eval_value, list) and isinstance(eval_value[0], str) and eval_value[0].startswith('merge'):
return _try_evaluate(eval_value[0])
return eval_value


def _try_evaluate(input_str: Union[str, bool]) -> Any:
try:
return evaluate(input_str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,31 @@ resource "aws_ecs_task_definition" "fail" {
])
execution_role_arn = "aws_iam_role.example.arn"
task_role_arn = "aws_iam_role.example.arn"
}

resource "aws_ecs_task_definition" "service01" {
family = "service"
container_definitions = jsonencode([
merge(
{
name = "first"
image = "service-first"
},
{
cpu = 10
memory = 512
essential = true
portMappings = [
{
containerPort = 80
hostPort = 80
}
]
}
)
])
volume {
name = "service-storage"
host_path = "/ecs/service-storage"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
metadata:
id: "CUSTOM_003"
scope:
provider: "AWS"
definition:
cond_type: "attribute"
resource_types:
- "aws_ecs_task_definition"
attribute: "container_definitions.*.image"
operator: "equals"
value: "service-first"
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,12 @@ def test_jsonpath_equals_azure_rule(self):
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

def test_jsonpath_equals_ecs_with_merge(self):
root_folder = '../../../resources/ecs_with_merge'
check_id = "CUSTOM_003"
should_pass = ['aws_ecs_task_definition.service01']
should_fail = ['aws_ecs_task_definition.service02']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)
53 changes: 53 additions & 0 deletions tests/terraform/graph/resources/ecs_with_merge/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
resource "aws_ecs_task_definition" "service01" {
family = "service"
container_definitions = jsonencode([
merge(
{
name = "first"
image = "service-first"
},
{
cpu = 10
memory = 512
essential = true
portMappings = [
{
containerPort = 80
hostPort = 80
}
]
}
)
])
volume {
name = "service-storage"
host_path = "/ecs/service-storage"
}
}

resource "aws_ecs_task_definition" "service02" {
family = "service"
container_definitions = jsonencode([
merge(
{
name = "first"
image = "service"
},
{
cpu = 10
memory = 512
essential = true
portMappings = [
{
containerPort = 80
hostPort = 80
}
]
}
)
])
volume {
name = "service-storage"
host_path = "/ecs/service-storage"
}
}
Loading