Skip to content

Commit

Permalink
support scanning of Terraform managed modules instead of downloading …
Browse files Browse the repository at this point in the history
…them
  • Loading branch information
gruebel committed Oct 11, 2023
1 parent c469868 commit 85ab7e0
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 23 deletions.
4 changes: 4 additions & 0 deletions checkov/terraform/module_loading/loaders/local_path_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def discover(self, module_params: ModuleParams) -> None:
pass

def _is_matching_loader(self, module_params: ModuleParams) -> bool:
if module_params.tf_managed:
# Terraform managed modules are already downloaded and can be handled as local modules
return True

if module_params.module_source.startswith(("./", "../", module_params.current_dir, "/")):
return True

Expand Down
85 changes: 75 additions & 10 deletions checkov/terraform/module_loading/module_finder.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
from __future__ import annotations

import json
import logging
import os
import re
from pathlib import Path
from typing import List, Callable

from checkov.common.parallelizer.parallel_runner import parallel_runner
from checkov.common.util.file_utils import read_file_with_any_encoding
from checkov.common.util.type_forcers import convert_str_to_bool
from checkov.terraform.module_loading.registry import module_loader_registry

MODULE_NAME_PATTERN = re.compile(r'[^#]*\bmodule\s*"(?P<name>.*)"')
MODULE_SOURCE_PATTERN = re.compile(r'[^#]*\bsource\s*=\s*"(?P<link>.*)"')
MODULE_VERSION_PATTERN = re.compile(r'[^#]*\bversion\s*=\s*"(?P<operator>=|!=|>=|>|<=|<|~>)?\s*(?P<version>[\d.]+-?\w*)"')
MODULE_VERSION_PATTERN = re.compile(r'[^#]*\bversion\s*=\s*"(?P<operator>=|!=|>=|>|<=|<|~>\s*)?(?P<version>[\d.]+-?\w*)"')


class ModuleDownload:
def __init__(self, source_dir: str) -> None:
self.source_dir = source_dir
self.address: str | None = None
self.module_name: str | None = None
self.module_link: str | None = None
self.tf_managed = False
self.version: str | None = None

def __str__(self) -> str:
return f"{self.source_dir} -> {self.module_link} ({self.version})"

@property
def address(self) -> str:
return f'{self.module_link}:{self.version}'


def find_modules(path: str) -> List[ModuleDownload]:
modules_found: list[ModuleDownload] = []
Expand All @@ -34,6 +37,9 @@ def find_modules(path: str) -> List[ModuleDownload]:
for file_name in full_file_names:
if not file_name.endswith('.tf'):
continue
if root.startswith(os.path.join(path, ".terraform", "modules")):
# don't scan the modules folder used by Terraform
continue

try:
content = read_file_with_any_encoding(file_path=os.path.join(path, root, file_name))
Expand All @@ -46,12 +52,19 @@ def find_modules(path: str) -> List[ModuleDownload]:
if not curr_md:
if line.startswith('module'):
curr_md = ModuleDownload(os.path.dirname(os.path.join(root, file_name)))

# also extract the name for easier mapping against the TF modules.json file
match = re.match(MODULE_NAME_PATTERN, line)
if match:
curr_md.module_name= match.group("name")

continue
else:
if line.startswith('}'):
if curr_md.module_link is None:
logging.warning(f'A module at {curr_md.source_dir} had no source, skipping')
else:
curr_md.address = f"{curr_md.module_link}:{curr_md.version}"
modules_found.append(curr_md)
curr_md = None
continue
Expand Down Expand Up @@ -93,8 +106,13 @@ def _download_module(m: ModuleDownload) -> bool:
if should_download_module(m.module_link):
logging.info(f'Downloading module {m.address}')
try:
content = module_loader_registry.load(m.source_dir, m.module_link,
"latest" if not m.version else m.version)
content = module_loader_registry.load(
current_dir=m.source_dir,
source=m.module_link,
source_version="latest" if not m.version else m.version,
module_address=m.address,
tf_managed=m.tf_managed,
)
if content is None or not content.loaded():
log_message = f'Failed to download module {m.address}'
if not module_loader_registry.download_external_modules:
Expand All @@ -109,12 +127,59 @@ def _download_module(m: ModuleDownload) -> bool:
# To avoid duplicate work, we need to get the distinct module sources
distinct_modules = list({m.address: m for m in modules_to_load}.values())

replaced_modules = replace_terraform_managed_modules(path=path, found_modules=distinct_modules)

if run_parallel:
list(parallel_runner.run_function(_download_module, distinct_modules))
list(parallel_runner.run_function(_download_module, replaced_modules))
else:
logging.info(f"Starting download of modules of length {len(distinct_modules)}")
for m in distinct_modules:
logging.info(f"Starting download of modules of length {len(replaced_modules)}")
for m in replaced_modules:
success = _download_module(m)
if not success and stop_on_failure:
logging.info(f"Stopping downloading of modules due to failed attempt on {m.address}")
break


def replace_terraform_managed_modules(path: str, found_modules: list[ModuleDownload]) -> list[ModuleDownload]:
"""Replaces modules by Terraform managed ones to prevent addtional downloading
It can't handle nested modules yet, ex.
{
"Key": "parent_module.child_module",
"Source": "./child_module",
"Dir": "parent_module/child_module"
}
"""

if not convert_str_to_bool(os.getenv("CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES", False)):
return found_modules

# file used by Terraform internally to map modules to the downloaded path
tf_modules_file = Path(path) / ".terraform/modules/modules.json"
if not tf_modules_file.exists():
return found_modules

# create Key (module name) to module detail map for faster querying
tf_modules = {
module["Key"]: module
for module in json.loads(tf_modules_file.read_bytes())["Modules"]
}

replaced_modules: list[ModuleDownload] = []
for module in found_modules:
if module.module_name in tf_modules:
tf_module = tf_modules[module.module_name]

module_new = ModuleDownload(source_dir=path)
# if version is 'None' then set it to latest in the address, so it can be mapped properly later on
module_new.address = f"{module.module_link}:latest" if module.version is None else module.address
module_new.module_link = tf_module["Dir"]
module_new.module_name = module.module_name
module_new.tf_managed = True
module_new.version = module.version

replaced_modules.append(module_new)
else:
replaced_modules.append(module)

return replaced_modules
14 changes: 12 additions & 2 deletions checkov/terraform/module_loading/module_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@

@dataclass
class ModuleParams:
def __init__(self, root_dir: str, current_dir: str, source: str, source_version: Optional[str], dest_dir: str,
external_modules_folder_name: str, inner_module: Optional[str] = None):
def __init__(
self,
root_dir: str,
current_dir: str,
source: str,
source_version: Optional[str],
dest_dir: str,
external_modules_folder_name: str,
inner_module: Optional[str] = None,
tf_managed: bool = False,
):
self.root_dir: str = root_dir
self.current_dir: str = current_dir
self.module_source: str = source
self.version: Optional[str] = source_version
self.dest_dir: str = dest_dir
self.external_modules_folder_name: str = external_modules_folder_name
self.inner_module: Optional[str] = inner_module
self.tf_managed = tf_managed

self.token: Optional[str] = None
self.username: Optional[str] = None
Expand Down
29 changes: 20 additions & 9 deletions checkov/terraform/module_loading/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@ def __init__(
self.failed_urls_cache: Set[str] = set()
self.root_dir = "" # root dir for storing external modules

def load(self, current_dir: str, source: str | None, source_version: Optional[str]) -> ModuleContent | None:
def load(
self,
current_dir: str,
source: str | None,
source_version: str | None,
module_address: str | None = None,
tf_managed: bool = False,
) -> ModuleContent | None:
"""
Search all registered loaders for the first one which is able to load the module source type. For more
information, see `loader.ModuleLoader.load`.
"""
if source is None:
return None

module_address = f'{source}:{source_version}'
if module_address is None:
module_address = f'{source}:{source_version}'
if module_address in self.module_content_cache:
logging.debug(f'Used the cache for module {module_address}')
return self.module_content_cache[module_address]
Expand Down Expand Up @@ -64,13 +72,16 @@ def load(self, current_dir: str, source: str | None, source_version: Optional[st
if not self.download_external_modules and loader.is_external:
continue
try:
module_params = ModuleParams(root_dir=self.root_dir,
current_dir=current_dir,
source=source,
source_version=source_version,
dest_dir=local_dir,
external_modules_folder_name=self.external_modules_folder_name,
inner_module=inner_module)
module_params = ModuleParams(
root_dir=self.root_dir,
current_dir=current_dir,
source=source,
source_version=source_version,
dest_dir=local_dir,
external_modules_folder_name=self.external_modules_folder_name,
inner_module=inner_module,
tf_managed=tf_managed,
)
logging.info(f"Attempting loading via {loader.__class__} loader")
content = loader.load(module_params)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"Modules": [
{
"Key": "log_group",
"Source": "registry.terraform.io/terraform-aws-modules/cloudwatch/aws//modules/log-group",
"Version": "4.1.0",
"Dir": ".terraform/modules/log_group/modules/log-group"
}
]
}
14 changes: 14 additions & 0 deletions tests/terraform/module_loading/data/tf_managed_modules/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module "log_group" {
source = "terraform-aws-modules/cloudwatch/aws//modules/log-group"

name_prefix = "my-log-group-"
retention_in_days = 7
}

module "log_group_v4" {
source = "terraform-aws-modules/cloudwatch/aws//modules/log-group"
version = "~> 4.0"

name_prefix = "my-log-group-"
retention_in_days = 7
}
52 changes: 52 additions & 0 deletions tests/terraform/module_loading/test_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
from pathlib import Path
from unittest import mock

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "True"})
def test_runner_with_tf_managed_modules():
# given
root_dir = Path(__file__).parent / "data/tf_managed_modules"

# when
result = Runner().run(
root_folder=str(root_dir),
runner_filter=RunnerFilter(checks=["CKV_AWS_338"], framework=["terraform"], download_external_modules=False),
)

# then
summary = result.get_summary()

assert summary["passed"] == 0
assert summary["failed"] == 1
assert summary["skipped"] == 0
assert summary["parsing_errors"] == 0

failed_resources = [check.resource for check in result.failed_checks]
expected_failed_resources = ["module.log_group.aws_cloudwatch_log_group.this[0]"]

assert failed_resources == expected_failed_resources


# test can be removed after setting this flow as default
@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "False"})
def test_runner_without_tf_managed_modules():
# given
root_dir = Path(__file__).parent / "data/tf_managed_modules"

# when
result = Runner().run(
root_folder=str(root_dir),
runner_filter=RunnerFilter(checks=["CKV_AWS_338"], framework=["terraform"], download_external_modules=False),
)

# then
summary = result.get_summary()

assert summary["passed"] == 0
assert summary["failed"] == 0
assert summary["skipped"] == 0
assert summary["parsing_errors"] == 0
31 changes: 29 additions & 2 deletions tests/terraform/module_loading/test_tf_module_finder.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import os
import shutil
import unittest
from pathlib import Path
from unittest import mock

from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR
from checkov.terraform.module_loading.module_finder import find_modules, should_download, load_tf_modules
from checkov.terraform.module_loading.module_finder import (
find_modules,
should_download,
load_tf_modules,
replace_terraform_managed_modules,
)
from checkov.terraform.module_loading.registry import module_loader_registry


Expand All @@ -20,7 +27,7 @@ def test_module_finder(self):
self.assertEqual(1, len(remote_modules))
for m in remote_modules:
if 'terraform-aws-modules' in m.module_link:
self.assertEqual('~>2.1.0', m.version)
self.assertEqual('~> 2.1.0', m.version)
else:
self.assertIsNone(m.version)

Expand All @@ -42,3 +49,23 @@ def test_downloader(self):
shutil.rmtree(os.path.join(self.get_src_dir(), DEFAULT_EXTERNAL_MODULES_DIR))
self.assertEqual(len(downloaded_modules), 1)
self.assertEqual(len(distinct_roots), 1)


@mock.patch.dict(os.environ, {"CHECKOV_EXPERIMENTAL_TERRAFORM_MANAGED_MODULES": "True"})
def test_tf_managed_modules():
# this test leverages the modules, which Terraform downloads on its own

# given
src_path = Path(__file__).parent / "data/tf_managed_modules"
modules = find_modules(str(src_path))

# when
replaced_modules = replace_terraform_managed_modules(path=str(src_path), found_modules=modules)

tf_managed_modules = [module for module in replaced_modules if module.tf_managed]
assert len(replaced_modules) == 2
assert len(tf_managed_modules) == 1

assert tf_managed_modules[0].tf_managed is True
assert tf_managed_modules[0].address == "terraform-aws-modules/cloudwatch/aws//modules/log-group:latest"
assert tf_managed_modules[0].module_link == ".terraform/modules/log_group/modules/log-group"

0 comments on commit 85ab7e0

Please sign in to comment.