Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(terraform): Nested source_module_objects with missing foreach key #5580

Merged
merged 11 commits into from
Sep 20, 2023
Merged
11 changes: 0 additions & 11 deletions checkov/terraform/graph_builder/foreach/abstract_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Any

from checkov.common.util.data_structures_utils import pickle_deepcopy
from checkov.terraform import TFModule
from checkov.terraform.graph_builder.foreach.consts import COUNT_STRING, FOREACH_STRING, COUNT_KEY, EACH_VALUE, \
EACH_KEY, REFERENCES_VALUES
from checkov.terraform.graph_builder.graph_components.block_types import BlockType
Expand Down Expand Up @@ -67,16 +66,6 @@ def _build_sub_graph(self, blocks_to_render: list[int]) -> TerraformLocalGraph:
sub_graph.out_edges = self.local_graph.out_edges
return sub_graph

@staticmethod
def _update_nested_tf_module_foreach_idx(original_foreach_or_count_key: int | str, original_module_key: TFModule,
tf_moudle: TFModule | None) -> None:
original_module_key.foreach_idx = None # Make sure it is always None even if we didn't override it previously
while tf_moudle is not None:
if tf_moudle == original_module_key:
tf_moudle.foreach_idx = original_foreach_or_count_key
break
tf_moudle = tf_moudle.nested_tf_module

@staticmethod
def _pop_foreach_attrs(attrs: dict[str, Any]) -> None:
attrs.pop(COUNT_STRING, None)
Expand Down
70 changes: 50 additions & 20 deletions checkov/terraform/graph_builder/foreach/module_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from checkov.common.util.consts import RESOLVED_MODULE_ENTRY_NAME
from checkov.common.util.data_structures_utils import pickle_deepcopy
from checkov.terraform import TFModule
from checkov.terraform import TFModule, TFDefinitionKey
from checkov.terraform.graph_builder.foreach.abstract_handler import ForeachAbstractHandler
from checkov.terraform.graph_builder.foreach.consts import FOREACH_STRING, COUNT_STRING
from checkov.terraform.graph_builder.graph_components.block_types import BlockType
Expand Down Expand Up @@ -88,7 +88,7 @@ def _get_current_tf_module_object(self, m_idx: int) -> TFModule:
return TFModule(m.path, m_name, m.source_module_object, m.for_each_index)

def _create_new_resources_foreach(self, statement: list[str] | dict[str, Any], block_idx: int) -> None:
# Important it will be before the super call to avoid changes occuring from super
# Important it will be before the super call to avoid changes occurring from super
main_resource = self.local_graph.vertices[block_idx]
super()._create_new_resources_foreach(statement, block_idx)

Expand All @@ -109,12 +109,9 @@ def _create_new_foreach_resource(self, block_idx: int, foreach_idx: int, main_re
def _update_module_children(self, main_resource: TerraformBlock,
original_foreach_or_count_key: int | str,
should_override_foreach_key: bool = True) -> None:
foreach_idx = original_foreach_or_count_key if not should_override_foreach_key else None
original_module_key = TFModule(path=main_resource.path, name=main_resource.name,
nested_tf_module=main_resource.source_module_object)

if not should_override_foreach_key:
original_module_key.foreach_idx = original_foreach_or_count_key

nested_tf_module=main_resource.source_module_object, foreach_idx=foreach_idx)
self._update_children_foreach_index(original_foreach_or_count_key, original_module_key,
should_override_foreach_key=should_override_foreach_key)

Expand All @@ -137,23 +134,27 @@ def _update_children_foreach_index(self, original_foreach_or_count_key: int | st
if current_module_key is None:
current_module_key = original_module_key
if current_module_key not in self.local_graph.vertices_by_module_dependency:
return
# Make sure we check both the intended key (with foreach key) and the one without the foreach key.
# This is important as we have some iterations in which we try to access with the intended key before
# we actually updated the dict itself
nullified_key = self._get_tf_module_with_no_foreach(current_module_key)
if nullified_key not in self.local_graph.vertices_by_module_dependency:
return
current_module_key = nullified_key
values = self.local_graph.vertices_by_module_dependency[current_module_key].values()
for child_indexes in values:
for child_index in child_indexes:
child = self.local_graph.vertices[child_index]

self._update_nested_tf_module_foreach_idx(original_foreach_or_count_key, original_module_key,
child.source_module_object)
child.source_module_object = self._get_module_with_only_relevant_foreach_idx(
original_foreach_or_count_key, original_module_key, child.source_module_object)
self._update_resolved_entry_for_tf_definition(child, original_foreach_or_count_key, original_module_key)

# Important to copy to avoid changing the object by reference
child_source_module_object_copy = pickle_deepcopy(child.source_module_object)
if should_override_foreach_key and child_source_module_object_copy is not None:
tf_module: TFModule | None = child_source_module_object_copy
while tf_module is not None:
tf_module.foreach_idx = None
tf_module = tf_module.nested_tf_module
child_source_module_object_copy = self._get_tf_module_with_no_foreach(
child_source_module_object_copy)

child_module_key = TFModule(path=child.path, name=child.name,
nested_tf_module=child_source_module_object_copy,
Expand All @@ -162,6 +163,14 @@ def _update_children_foreach_index(self, original_foreach_or_count_key: int | st
self._update_children_foreach_index(original_foreach_or_count_key, original_module_key,
child_module_key)

@staticmethod
def _get_tf_module_with_no_foreach(original_module: TFModule | None) -> TFModule | None:
if original_module is None:
return original_module
return TFModule(name=original_module.name, path=original_module.path, foreach_idx=None,
nested_tf_module=ForeachModuleHandler._get_tf_module_with_no_foreach(
original_module.nested_tf_module))

def _create_new_module(
self,
main_resource: TerraformBlock,
Expand All @@ -180,7 +189,7 @@ def _create_new_module(
main_resource_module_key = TFModule(
path=new_resource.path,
name=main_resource.name,
nested_tf_module=new_resource.source_module_object,
nested_tf_module=self._get_tf_module_with_no_foreach(new_resource.source_module_object)
)

# Without making this copy the test don't pass, as we might access the data structure in the middle of an update
Expand All @@ -198,8 +207,10 @@ def _create_new_module(
else:
self.local_graph.vertices[resource_idx] = new_resource

key_with_foreach_index = main_resource_module_key
key_with_foreach_index.foreach_idx = idx_to_change
key_with_foreach_index = TFModule(name=main_resource_module_key.name,
path=main_resource_module_key.path,
nested_tf_module=main_resource_module_key.nested_tf_module,
foreach_idx=idx_to_change)
self.local_graph.vertices_by_module_dependency[key_with_foreach_index] = main_resource_module_value
self.local_graph.vertices_by_module_dependency_by_name[key_with_foreach_index][new_resource.name] = main_resource_module_value

Expand Down Expand Up @@ -269,9 +280,28 @@ def _update_resolved_entry_for_tf_definition(child: TerraformBlock, original_for
if isinstance(config, dict):
resolved_module_name = config.get(RESOLVED_MODULE_ENTRY_NAME)
if resolved_module_name is not None and len(resolved_module_name) > 0:
tf_moudle: TFModule = resolved_module_name[0].tf_source_modules
ForeachAbstractHandler._update_nested_tf_module_foreach_idx(
original_definition_key = config[RESOLVED_MODULE_ENTRY_NAME][0]
tf_source_modules = ForeachModuleHandler._get_module_with_only_relevant_foreach_idx(
original_foreach_or_count_key,
original_module_key,
tf_moudle,
resolved_module_name[0].tf_source_modules,
)
config[RESOLVED_MODULE_ENTRY_NAME][0] = TFDefinitionKey(file_path=original_definition_key.file_path,
tf_source_modules=tf_source_modules)

@staticmethod
def _get_module_with_only_relevant_foreach_idx(original_foreach_or_count_key: int | str,
original_module_key: TFModule,
tf_moudle: TFModule | None) -> TFModule | None:
if tf_moudle is None:
return None
if tf_moudle == original_module_key:
return TFModule(name=tf_moudle.name, path=tf_moudle.path,
nested_tf_module=tf_moudle.nested_tf_module,
foreach_idx=original_foreach_or_count_key)
nested_module = tf_moudle.nested_tf_module
updated_module = ForeachModuleHandler._get_module_with_only_relevant_foreach_idx(
original_foreach_or_count_key, original_module_key, nested_module)
return TFModule(name=tf_moudle.name, path=tf_moudle.path,
nested_tf_module=updated_module,
foreach_idx=tf_moudle.foreach_idx)
40 changes: 10 additions & 30 deletions checkov/terraform/modules/module_objects.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
from __future__ import annotations
import json
from collections.abc import Iterator
from typing import Optional, Any
from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class TFModule:
__slots__ = ("path", "name", "foreach_idx", "nested_tf_module")

def __init__(self, path: str, name: str | None, nested_tf_module: Optional[TFModule] = None,
foreach_idx: Optional[int | str] = None) -> None:
self.path = path
self.name = name
self.foreach_idx = foreach_idx
self.nested_tf_module = nested_tf_module

def __eq__(self, other: Any) -> bool:
if not isinstance(other, TFModule):
return False
return self.path == other.path and self.name == other.name and self.nested_tf_module == other.nested_tf_module and self.foreach_idx == other.foreach_idx
path: str
name: str | None
nested_tf_module: TFModule | None = None
foreach_idx: int | str | None = None

def __lt__(self, other: Any) -> bool:
if not isinstance(other, TFModule):
Expand All @@ -28,9 +21,6 @@ def __lt__(self, other: Any) -> bool:
def __repr__(self) -> str:
return f'path:{self.path}, name:{self.name}, nested_tf_module:{self.nested_tf_module}, foreach_idx:{self.foreach_idx}'

def __hash__(self) -> int:
return hash((self.path, self.name, self.nested_tf_module, self.foreach_idx))

def __iter__(self) -> Iterator[tuple[str, Any]]:
yield from {
"path": self.path,
Expand All @@ -50,17 +40,10 @@ def from_json(json_dct: dict[str, Any]) -> TFModule | None:
'nested_tf_module') else None) if json_dct else None


@dataclass(frozen=True)
class TFDefinitionKey:
__slots__ = ("tf_source_modules", "file_path")

def __init__(self, file_path: str, tf_source_modules: Optional[TFModule] = None) -> None:
self.tf_source_modules = tf_source_modules
self.file_path = file_path

def __eq__(self, other: Any) -> bool:
if not isinstance(other, TFDefinitionKey):
return False
return self.tf_source_modules == other.tf_source_modules and self.file_path == other.file_path
file_path: str
tf_source_modules: TFModule | None = None

def __lt__(self, other: Any) -> bool:
if not isinstance(other, TFDefinitionKey):
Expand All @@ -70,9 +53,6 @@ def __lt__(self, other: Any) -> bool:
def __repr__(self) -> str:
return f'tf_source_modules:{self.tf_source_modules}, file_path:{self.file_path}'

def __hash__(self) -> int:
return hash((self.file_path, self.tf_source_modules))

def __iter__(self) -> Iterator[tuple[str, Any]]:
yield from {
"file_path": self.file_path,
Expand Down
Loading