diff --git a/checkov/terraform/module_loading/loaders/local_path_loader.py b/checkov/terraform/module_loading/loaders/local_path_loader.py index 392b02d3739..3943adf29bd 100644 --- a/checkov/terraform/module_loading/loaders/local_path_loader.py +++ b/checkov/terraform/module_loading/loaders/local_path_loader.py @@ -24,6 +24,10 @@ def discover(self, module_params: ModuleParams) -> None: pass def _is_matching_loader(self, module_params: ModuleParams) -> bool: + if module_params.tf_managed: + # Terraform managed modules are already downloaded and can be handled as local modules + return True + if module_params.module_source.startswith(("./", "../", module_params.current_dir, "/")): return True diff --git a/checkov/terraform/module_loading/module_finder.py b/checkov/terraform/module_loading/module_finder.py index 55c7b259f8e..8eaac51c82d 100644 --- a/checkov/terraform/module_loading/module_finder.py +++ b/checkov/terraform/module_loading/module_finder.py @@ -1,31 +1,34 @@ from __future__ import annotations +import json import logging import os import re +from pathlib import Path from typing import List, Callable from checkov.common.parallelizer.parallel_runner import parallel_runner from checkov.common.util.file_utils import read_file_with_any_encoding +from checkov.common.util.type_forcers import convert_str_to_bool from checkov.terraform.module_loading.registry import module_loader_registry +MODULE_NAME_PATTERN = re.compile(r'[^#]*\bmodule\s*"(?P.*)"') MODULE_SOURCE_PATTERN = re.compile(r'[^#]*\bsource\s*=\s*"(?P.*)"') -MODULE_VERSION_PATTERN = re.compile(r'[^#]*\bversion\s*=\s*"(?P=|!=|>=|>|<=|<|~>)?\s*(?P[\d.]+-?\w*)"') +MODULE_VERSION_PATTERN = re.compile(r'[^#]*\bversion\s*=\s*"(?P=|!=|>=|>|<=|<|~>\s*)?(?P[\d.]+-?\w*)"') class ModuleDownload: def __init__(self, source_dir: str) -> None: self.source_dir = source_dir + self.address: str | None = None + self.module_name: str | None = None self.module_link: str | None = None + self.tf_managed = False self.version: str | None = None def __str__(self) -> str: return f"{self.source_dir} -> {self.module_link} ({self.version})" - @property - def address(self) -> str: - return f'{self.module_link}:{self.version}' - def find_modules(path: str) -> List[ModuleDownload]: modules_found: list[ModuleDownload] = [] @@ -34,6 +37,9 @@ def find_modules(path: str) -> List[ModuleDownload]: for file_name in full_file_names: if not file_name.endswith('.tf'): continue + if root.startswith(os.path.join(path, ".terraform", "modules")): + # don't scan the modules folder used by Terraform + continue try: content = read_file_with_any_encoding(file_path=os.path.join(path, root, file_name)) @@ -46,12 +52,19 @@ def find_modules(path: str) -> List[ModuleDownload]: if not curr_md: if line.startswith('module'): curr_md = ModuleDownload(os.path.dirname(os.path.join(root, file_name))) + + # also extract the name for easier mapping against the TF modules.json file + match = re.match(MODULE_NAME_PATTERN, line) + if match: + curr_md.module_name= match.group("name") + continue else: if line.startswith('}'): if curr_md.module_link is None: logging.warning(f'A module at {curr_md.source_dir} had no source, skipping') else: + curr_md.address = f"{curr_md.module_link}:{curr_md.version}" modules_found.append(curr_md) curr_md = None continue @@ -93,8 +106,13 @@ def _download_module(m: ModuleDownload) -> bool: if should_download_module(m.module_link): logging.info(f'Downloading module {m.address}') try: - content = module_loader_registry.load(m.source_dir, m.module_link, - "latest" if not m.version else m.version) + content = module_loader_registry.load( + current_dir=m.source_dir, + source=m.module_link, + source_version="latest" if not m.version else m.version, + module_address=m.address, + tf_managed=m.tf_managed, + ) if content is None or not content.loaded(): log_message = f'Failed to download module {m.address}' if not module_loader_registry.download_external_modules: @@ -109,12 +127,59 @@ def _download_module(m: ModuleDownload) -> bool: # To avoid duplicate work, we need to get the distinct module sources distinct_modules = list({m.address: m for m in modules_to_load}.values()) + replaced_modules = replace_terraform_managed_modules(path=path, found_modules=distinct_modules) + if run_parallel: - list(parallel_runner.run_function(_download_module, distinct_modules)) + list(parallel_runner.run_function(_download_module, replaced_modules)) else: - logging.info(f"Starting download of modules of length {len(distinct_modules)}") - for m in distinct_modules: + logging.info(f"Starting download of modules of length {len(replaced_modules)}") + for m in replaced_modules: success = _download_module(m) if not success and stop_on_failure: logging.info(f"Stopping downloading of modules due to failed attempt on {m.address}") break + + +def replace_terraform_managed_modules(path: str, found_modules: list[ModuleDownload]) -> list[ModuleDownload]: + """Replaces modules by Terraform managed ones to prevent addtional downloading + + It can't handle nested modules yet, ex. + { + "Key": "parent_module.child_module", + "Source": "./child_module", + "Dir": "parent_module/child_module" + } + """ + + if not convert_str_to_bool(os.getenv("CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES", False)): + return found_modules + + # file used by Terraform internally to map modules to the downloaded path + tf_modules_file = Path(path) / ".terraform/modules/modules.json" + if not tf_modules_file.exists(): + return found_modules + + # create Key (module name) to module detail map for faster querying + tf_modules = { + module["Key"]: module + for module in json.loads(tf_modules_file.read_bytes())["Modules"] + } + + replaced_modules: list[ModuleDownload] = [] + for module in found_modules: + if module.module_name in tf_modules: + tf_module = tf_modules[module.module_name] + + module_new = ModuleDownload(source_dir=path) + # if version is 'None' then set it to latest in the address, so it can be mapped properly later on + module_new.address = f"{module.module_link}:latest" if module.version is None else module.address + module_new.module_link = tf_module["Dir"] + module_new.module_name = module.module_name + module_new.tf_managed = True + module_new.version = module.version + + replaced_modules.append(module_new) + else: + replaced_modules.append(module) + + return replaced_modules diff --git a/checkov/terraform/module_loading/module_params.py b/checkov/terraform/module_loading/module_params.py index 1b5976f6d33..30abd6ed718 100644 --- a/checkov/terraform/module_loading/module_params.py +++ b/checkov/terraform/module_loading/module_params.py @@ -4,8 +4,17 @@ @dataclass class ModuleParams: - def __init__(self, root_dir: str, current_dir: str, source: str, source_version: Optional[str], dest_dir: str, - external_modules_folder_name: str, inner_module: Optional[str] = None): + def __init__( + self, + root_dir: str, + current_dir: str, + source: str, + source_version: Optional[str], + dest_dir: str, + external_modules_folder_name: str, + inner_module: Optional[str] = None, + tf_managed: bool = False, + ): self.root_dir: str = root_dir self.current_dir: str = current_dir self.module_source: str = source @@ -13,6 +22,7 @@ def __init__(self, root_dir: str, current_dir: str, source: str, source_version: self.dest_dir: str = dest_dir self.external_modules_folder_name: str = external_modules_folder_name self.inner_module: Optional[str] = inner_module + self.tf_managed = tf_managed self.token: Optional[str] = None self.username: Optional[str] = None diff --git a/checkov/terraform/module_loading/registry.py b/checkov/terraform/module_loading/registry.py index f6733d44b3b..5b6eca9a706 100644 --- a/checkov/terraform/module_loading/registry.py +++ b/checkov/terraform/module_loading/registry.py @@ -28,7 +28,14 @@ def __init__( self.failed_urls_cache: Set[str] = set() self.root_dir = "" # root dir for storing external modules - def load(self, current_dir: str, source: str | None, source_version: Optional[str]) -> ModuleContent | None: + def load( + self, + current_dir: str, + source: str | None, + source_version: str | None, + module_address: str | None = None, + tf_managed: bool = False, + ) -> ModuleContent | None: """ Search all registered loaders for the first one which is able to load the module source type. For more information, see `loader.ModuleLoader.load`. @@ -36,7 +43,8 @@ def load(self, current_dir: str, source: str | None, source_version: Optional[st if source is None: return None - module_address = f'{source}:{source_version}' + if module_address is None: + module_address = f'{source}:{source_version}' if module_address in self.module_content_cache: logging.debug(f'Used the cache for module {module_address}') return self.module_content_cache[module_address] @@ -64,13 +72,16 @@ def load(self, current_dir: str, source: str | None, source_version: Optional[st if not self.download_external_modules and loader.is_external: continue try: - module_params = ModuleParams(root_dir=self.root_dir, - current_dir=current_dir, - source=source, - source_version=source_version, - dest_dir=local_dir, - external_modules_folder_name=self.external_modules_folder_name, - inner_module=inner_module) + module_params = ModuleParams( + root_dir=self.root_dir, + current_dir=current_dir, + source=source, + source_version=source_version, + dest_dir=local_dir, + external_modules_folder_name=self.external_modules_folder_name, + inner_module=inner_module, + tf_managed=tf_managed, + ) logging.info(f"Attempting loading via {loader.__class__} loader") content = loader.load(module_params) except Exception as e: diff --git a/tests/terraform/module_loading/data/tf_managed_modules/.terraform/modules/modules.json b/tests/terraform/module_loading/data/tf_managed_modules/.terraform/modules/modules.json new file mode 100644 index 00000000000..cdea57f4a46 --- /dev/null +++ b/tests/terraform/module_loading/data/tf_managed_modules/.terraform/modules/modules.json @@ -0,0 +1,10 @@ +{ + "Modules": [ + { + "Key": "log_group", + "Source": "registry.terraform.io/terraform-aws-modules/cloudwatch/aws//modules/log-group", + "Version": "4.1.0", + "Dir": ".terraform/modules/log_group/modules/log-group" + } + ] +} \ No newline at end of file diff --git a/tests/terraform/module_loading/data/tf_managed_modules/main.tf b/tests/terraform/module_loading/data/tf_managed_modules/main.tf new file mode 100644 index 00000000000..7f2f3164886 --- /dev/null +++ b/tests/terraform/module_loading/data/tf_managed_modules/main.tf @@ -0,0 +1,14 @@ +module "log_group" { + source = "terraform-aws-modules/cloudwatch/aws//modules/log-group" + + name_prefix = "my-log-group-" + retention_in_days = 7 +} + +module "log_group_v4" { + source = "terraform-aws-modules/cloudwatch/aws//modules/log-group" + version = "~> 4.0" + + name_prefix = "my-log-group-" + retention_in_days = 7 +} diff --git a/tests/terraform/module_loading/test_runner.py b/tests/terraform/module_loading/test_runner.py new file mode 100644 index 00000000000..e93bde67023 --- /dev/null +++ b/tests/terraform/module_loading/test_runner.py @@ -0,0 +1,52 @@ +import os +from pathlib import Path +from unittest import mock + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner + + +@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "True"}) +def test_runner_with_tf_managed_modules(): + # given + root_dir = Path(__file__).parent / "data/tf_managed_modules" + + # when + result = Runner().run( + root_folder=str(root_dir), + runner_filter=RunnerFilter(checks=["CKV_AWS_338"], framework=["terraform"], download_external_modules=False), + ) + + # then + summary = result.get_summary() + + assert summary["passed"] == 0 + assert summary["failed"] == 1 + assert summary["skipped"] == 0 + assert summary["parsing_errors"] == 0 + + failed_resources = [check.resource for check in result.failed_checks] + expected_failed_resources = ["module.log_group.aws_cloudwatch_log_group.this[0]"] + + assert failed_resources == expected_failed_resources + + +# test can be removed after setting this flow as default +@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "False"}) +def test_runner_without_tf_managed_modules(): + # given + root_dir = Path(__file__).parent / "data/tf_managed_modules" + + # when + result = Runner().run( + root_folder=str(root_dir), + runner_filter=RunnerFilter(checks=["CKV_AWS_338"], framework=["terraform"], download_external_modules=False), + ) + + # then + summary = result.get_summary() + + assert summary["passed"] == 0 + assert summary["failed"] == 0 + assert summary["skipped"] == 0 + assert summary["parsing_errors"] == 0 diff --git a/tests/terraform/module_loading/test_tf_module_finder.py b/tests/terraform/module_loading/test_tf_module_finder.py index 7351bad0da3..867fcc947c0 100644 --- a/tests/terraform/module_loading/test_tf_module_finder.py +++ b/tests/terraform/module_loading/test_tf_module_finder.py @@ -1,9 +1,16 @@ import os import shutil import unittest +from pathlib import Path +from unittest import mock from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR -from checkov.terraform.module_loading.module_finder import find_modules, should_download, load_tf_modules +from checkov.terraform.module_loading.module_finder import ( + find_modules, + should_download, + load_tf_modules, + replace_terraform_managed_modules, +) from checkov.terraform.module_loading.registry import module_loader_registry @@ -20,7 +27,7 @@ def test_module_finder(self): self.assertEqual(1, len(remote_modules)) for m in remote_modules: if 'terraform-aws-modules' in m.module_link: - self.assertEqual('~>2.1.0', m.version) + self.assertEqual('~> 2.1.0', m.version) else: self.assertIsNone(m.version) @@ -42,3 +49,23 @@ def test_downloader(self): shutil.rmtree(os.path.join(self.get_src_dir(), DEFAULT_EXTERNAL_MODULES_DIR)) self.assertEqual(len(downloaded_modules), 1) self.assertEqual(len(distinct_roots), 1) + + +@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "True"}) +def test_tf_managed_modules(): + # this test leverages the modules, which Terraform downloads on its own + + # given + src_path = Path(__file__).parent / "data/tf_managed_modules" + modules = find_modules(str(src_path)) + + # when + replaced_modules = replace_terraform_managed_modules(path=str(src_path), found_modules=modules) + + tf_managed_modules = [module for module in replaced_modules if module.tf_managed] + assert len(replaced_modules) == 2 + assert len(tf_managed_modules) == 1 + + assert tf_managed_modules[0].tf_managed is True + assert tf_managed_modules[0].address == "terraform-aws-modules/cloudwatch/aws//modules/log-group:latest" + assert tf_managed_modules[0].module_link == ".terraform/modules/log_group/modules/log-group"