Skip to content

Commit

Permalink
feat(terraform): support provider in tf_plan graph (#6195)
Browse files Browse the repository at this point in the history
* feat(terraform): support provider in tf_plan graph

* fix pr comments

* fix mypy issues

---------

Co-authored-by: Steve Vaknin <[email protected]>
  • Loading branch information
SteveVaknin and SteveVaknin authored Apr 30, 2024
1 parent 0068d76 commit f2f9ece
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
35 changes: 33 additions & 2 deletions checkov/terraform/plan_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import itertools
import json
import logging
from typing import Optional, Tuple, Dict, List, Any, cast
from typing import Any, Dict, List, Optional, Tuple, cast

from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.parsers.node import ListNode
Expand Down Expand Up @@ -280,6 +280,33 @@ def _get_module_call_resources(module_address: str, root_module_conf: dict[str,
return cast("list[dict[str, Any]]", root_module_conf.get("resources", []))


def _is_provider_key(key: str) -> bool:
"""key is a valid provider"""
return (key.startswith('module.') or key.startswith('__') or key in {'start_line', 'end_line'})


def _get_provider(template: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Returns the provider dict"""

provider_map: dict[str, dict[str, Any]] = {}
provider_config = template.get("configuration", {}).get("provider_config")

if provider_config and isinstance(provider_config, dict):
for provider_key, provider_data in provider_config.items():
if _is_provider_key(key=provider_key):
# Not a provider, skip
continue
provider_map[provider_key] = {}
for field, value in provider_data.get('expressions', {}).items():
if field in LINE_FIELD_NAMES or not isinstance(value, dict):
continue # don't care about line #s or non dicts
expression_value = value.get('constant_value', None)
if expression_value:
provider_map[provider_key][field] = expression_value

return provider_map


def _get_resource_changes(template: dict[str, Any]) -> dict[str, dict[str, Any]]:
"""Returns a resource address to resource changes dict"""

Expand Down Expand Up @@ -331,11 +358,15 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl
:type tf_plan_file: str - path to plan file
:rtype: tf_definition dictionary and template_lines of the plan file
"""
tf_definition: Dict[str, Any] = {"resource": [], "data": []}
tf_definition: Dict[str, Any] = {"provider": [], "resource": [], "data": []}
template, template_lines = parse(tf_plan_file, out_parsing_errors)
if not template:
return None, None

provider = _get_provider(template=template)
if bool(provider):
tf_definition["provider"].append(provider)

resource_changes = _get_resource_changes(template=template)

for resource in template.get("planned_values", {}).get("root_module", {}).get("resources", []):
Expand Down
8 changes: 8 additions & 0 deletions tests/terraform/parser/test_plan_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def test_tags_values_are_flattened(self):
if tag_key not in ['__startline__', '__endline__', 'start_line', 'end_line']:
self.assertIsInstance(tag_value, StrNode)

def test_provider_is_included(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_plan_path = current_dir + "/resources/plan_tags/tfplan.json"
tf_definition, _ = parse_tf_plan(valid_plan_path, {})
file_provider_definition = tf_definition['provider']
self.assertTrue(file_provider_definition) # assert a provider exists
assert file_provider_definition[0].get('aws',{}).get('region', None) == 'us-west-2'

def test_more_tags_values_are_flattened(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_plan_path = current_dir + "/resources/plan_tags_variety/tfplan.json"
Expand Down

0 comments on commit f2f9ece

Please sign in to comment.