diff --git a/checkov/terraform/checks/resource/aws/AWSCodeGuruHasCMK.py b/checkov/terraform/checks/resource/aws/AWSCodeGuruHasCMK.py new file mode 100644 index 00000000000..ed8ae3471aa --- /dev/null +++ b/checkov/terraform/checks/resource/aws/AWSCodeGuruHasCMK.py @@ -0,0 +1,33 @@ +from typing import Dict, List, Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck + + +class AWSCodeGuruHasCMK(BaseResourceCheck): + def __init__(self): + # This is the full description of your check + description = "Make sure that aws_codegurureviewer_repository_association has a CMK" + + # This is the Unique ID for your check + id = "CKV_AWS_381" + + # These are the terraform objects supported by this check (ex: aws_iam_policy_document) + supported_resources = ['aws_codegurureviewer_repository_association'] + + # Valid CheckCategories are defined in checkov/common/models/enums.py + categories = [CheckCategories.ENCRYPTION] + super().__init__(name=description, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: + if 'kms_key_details' in conf: + kms_key_details = conf['kms_key_details'][0] + if 'encryption_option' in kms_key_details: + encryption_option = kms_key_details['encryption_option'][0] + if encryption_option == 'CUSTOMER_MANAGED_CMK': + return CheckResult.PASSED + + return CheckResult.FAILED + + +check = AWSCodeGuruHasCMK() diff --git a/tests/terraform/checks/resource/aws/example_AWSCodeGuruHasCMK/AWSCodeGuruHasCMK.tf b/tests/terraform/checks/resource/aws/example_AWSCodeGuruHasCMK/AWSCodeGuruHasCMK.tf new file mode 100644 index 00000000000..1d660707fbc --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_AWSCodeGuruHasCMK/AWSCodeGuruHasCMK.tf @@ -0,0 +1,44 @@ + +resource "aws_codegurureviewer_repository_association" "pass" { + repository { + codecommit { + name = "repository_name" + } + } + kms_key_details { + encryption_option = "CUSTOMER_MANAGED_CMK" + kms_key_id = "aws_kms_key.example.key_id" + } +} + +resource "aws_codegurureviewer_repository_association" "ckv_unittest_fail_no_encryption_option" { + repository { + codecommit { + name = "repository_name" + } + } + kms_key_details { + kms_key_id = "aws_kms_key.example.key_id" + } +} + + +resource "aws_codegurureviewer_repository_association" "ckv_unittest_fail_no_kms_key_details" { + repository { + codecommit { + name = "repository_name" + } + } +} + +resource "aws_codegurureviewer_repository_association" "ckv_unittest_fail_encryption_option_OWNED" { + repository { + codecommit { + name = "repository_name" + } + } + kms_key_details { + encryption_option = "AWS_OWNED_CMK" + kms_key_id = "aws_kms_key.example.key_id" + } +} diff --git a/tests/terraform/checks/resource/aws/test_AWSCodeGuruHasCMK.py b/tests/terraform/checks/resource/aws/test_AWSCodeGuruHasCMK.py new file mode 100644 index 00000000000..b5625620185 --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_AWSCodeGuruHasCMK.py @@ -0,0 +1,43 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner +from checkov.terraform.checks.resource.aws.AWSCodeGuruHasCMK import check + + +class TestAWSCodeGuruHasCMK(unittest.TestCase): + + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = os.path.join(current_dir, "example_AWSCodeGuruHasCMK") + report = runner.run(root_folder=test_files_dir, + runner_filter=RunnerFilter(checks=[check.id])) + summary = report.get_summary() + + passing_resources = { + 'aws_codegurureviewer_repository_association.pass' + } + failing_resources = { + 'aws_codegurureviewer_repository_association.ckv_unittest_fail_no_encryption_option', + 'aws_codegurureviewer_repository_association.ckv_unittest_fail_no_kms_key_details', + 'aws_codegurureviewer_repository_association.ckv_unittest_fail_encryption_option_OWNED', + } + skipped_resources = {} + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary['passed'], len(passing_resources)) + self.assertEqual(summary['failed'], len(failing_resources)) + self.assertEqual(summary['skipped'], len(skipped_resources)) + self.assertEqual(summary['parsing_errors'], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file