Skip to content

Commit

Permalink
feat(terraform): 2 new checks (#6764)
Browse files Browse the repository at this point in the history
* Policy1

* Check2

* Fix flake8

* operator fix

* Update TransferServerLatestPolicy.py
  • Loading branch information
tsmithv11 authored Oct 14, 2024
1 parent 4e36cce commit 361da72
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 0 deletions.
17 changes: 17 additions & 0 deletions checkov/terraform/checks/graph_checks/aws/SQSEncryptionCMK.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
metadata:
id: "CKV2_AWS_73"
name: "Ensure AWS SQS uses CMK not AWS default keys for encryption"
category: "ENCRYPTION"
definition:
or:
- cond_type: "attribute"
resource_types:
- "aws_sqs_queue"
attribute: "kms_master_key_id"
operator: "not_exists"
- cond_type: "attribute"
resource_types:
- "aws_sqs_queue"
attribute: "kms_master_key_id"
operator: "not_equals"
value: "alias/aws/sqs"
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from datetime import datetime

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class TransferServerLatestPolicy(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure AWS Transfer Server uses latest Security Policy"
id = "CKV_AWS_380"
supported_resources = ('aws_transfer_server',)
categories = [CheckCategories.NETWORKING]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def __check_policy_date(self, policy_string: str) -> bool:
# Extract the year and month from the policy string
# We assume that the year and month appear in the format 'YYYY-MM'

# Split the string based on the '-' separator
parts = policy_string.split('-')

# Loop through the parts and check for the first valid year-month pair
for i in range(len(parts) - 1):
# Try to form a year-month date from consecutive parts
year = parts[i]
month = parts[i + 1]

try:
# If both year and month are integers and valid, create the date
policy_date = datetime(int(year), int(month), 1)
break
except ValueError:
continue
else:
# If no valid year-month combination is found, raise an error
raise ValueError("No valid date found in the policy string.")

# Get the current date
current_date = datetime.now()

# Calculate the time difference in months
years_diff = current_date.year - policy_date.year
months_diff = current_date.month - policy_date.month

total_months_diff = years_diff * 12 + months_diff

# If the difference is more than or equal to 24 months, return False
return total_months_diff < 24

def scan_resource_conf(self, conf: any) -> CheckResult:
"""
Makes sure the Security Policy is no older than 2 years
"""
security_policy = conf.get('security_policy_name')
if security_policy:
if self.__check_policy_date(security_policy[0]):
return CheckResult.PASSED
return CheckResult.FAILED # default is TransferSecurityPolicy-2018-11 which is old: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/transfer_server

def get_evaluated_key(self) -> str:
return "security_policy_name"


check = TransferServerLatestPolicy()
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
resource "aws_transfer_server" "fail_old" {
endpoint_type = "PUBLIC"
identity_provider_type = "SERVICE_MANAGED"

# Using an outdated security policy (not the latest)
security_policy_name = "TransferSecurityPolicy-2018-11"

tags = {
Name = "OldTransferServer"
}
}

resource "aws_transfer_server" "pass_new" {
endpoint_type = "PUBLIC"
identity_provider_type = "SERVICE_MANAGED"

# Using the latest security policy (as of this example)
security_policy_name = "TransferSecurityPolicy-2024-01"

tags = {
Name = "LatestTransferServer"
}
}

resource "aws_transfer_server" "fail_old_fips" {
endpoint_type = "PUBLIC"
identity_provider_type = "SERVICE_MANAGED"

# Using the latest security policy (as of this example)
security_policy_name = "TransferSecurityPolicy-FIPS-2020-06"

tags = {
Name = "LatestTransferServer"
}
}

resource "aws_transfer_server" "pass_fips" {
endpoint_type = "PUBLIC"
identity_provider_type = "SERVICE_MANAGED"

# Using the latest security policy (as of this example)
security_policy_name = "TransferSecurityPolicy-FIPS-2024-01"

tags = {
Name = "LatestTransferServer"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.TransferServerLatestPolicy import check
from checkov.terraform.runner import Runner


class TestTransferServerLatestPolicy(unittest.TestCase):
def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = current_dir + "/example_TransferServerLatestPolicy"
report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
"aws_transfer_server.pass_new",
"aws_transfer_server.pass_fips",
}
failing_resources = {
"aws_transfer_server.fail_old",
"aws_transfer_server.fail_old_fips",
}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary["passed"], 2)
self.assertEqual(summary["failed"], 2)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pass:
- "aws_sqs_queue.pass_notexists"
- "aws_sqs_queue.pass_different_start"

fail:
- "aws_sqs_queue.fail"
36 changes: 36 additions & 0 deletions tests/terraform/graph/checks/resources/SQSEncryptionCMK/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
resource "aws_sqs_queue" "fail" {
name = "example-queue"
kms_master_key_id = "alias/aws/sqs" # Violates the RQL by using the AWS-managed key instead of a customer-managed key.

# Other SQS queue attributes
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 345600
receive_wait_time_seconds = 0
visibility_timeout_seconds = 30
}


resource "aws_sqs_queue" "pass_notexists" {
name = "example-queue"

# Other SQS queue attributes
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 345600
receive_wait_time_seconds = 0
visibility_timeout_seconds = 30
}

resource "aws_sqs_queue" "pass_different_start" {
name = "example-queue"
kms_master_key_id = "foo"


# Other SQS queue attributes
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 345600
receive_wait_time_seconds = 0
visibility_timeout_seconds = 30
}
3 changes: 3 additions & 0 deletions tests/terraform/graph/checks/test_yaml_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ def test_ACMWildcardDomainName(self):
def test_CloudfrontOriginNotHTTPSOnly(self):
self.go("CloudfrontOriginNotHTTPSOnly")

def test_SQSEncryptionCMK(self):
self.go("SQSEncryptionCMK")

def test_registry_load(self):
registry = Registry(parser=GraphCheckParser(), checks_dir=str(
Path(__file__).parent.parent.parent.parent.parent / "checkov" / "terraform" / "checks" / "graph_checks"))
Expand Down

0 comments on commit 361da72

Please sign in to comment.