From 55bb55e220c56197c764acf023cb3c3b027b30b4 Mon Sep 17 00:00:00 2001 From: gruebel Date: Tue, 17 Oct 2023 22:38:39 +0200 Subject: [PATCH 1/2] improve various Terraform chceks --- .../LambdaEnvironmentEncryptionSettings.py | 22 +++++++------- .../resource/aws/EKSPublicAccessCIDR.py | 14 +++++---- .../LambdaEnvironmentEncryptionSettings.py | 10 ++----- .../checks/resource/aws/SNSTopicEncryption.py | 6 ++-- .../azure/MariaDBPublicAccessDisabled.py | 23 +++++++------- .../checks/resource/gcp/GKEClusterLogging.py | 30 +++++++------------ 6 files changed, 47 insertions(+), 58 deletions(-) diff --git a/checkov/cloudformation/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py b/checkov/cloudformation/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py index fa5bb7c9491..aa542fd030c 100644 --- a/checkov/cloudformation/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py +++ b/checkov/cloudformation/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py @@ -1,30 +1,32 @@ -from typing import List +from __future__ import annotations + +from typing import Any from checkov.common.models.enums import CheckResult, CheckCategories from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck class LambdaEnvironmentEncryptionSettings(BaseResourceCheck): - def __init__(self): + def __init__(self) -> None: name = "Check encryption settings for Lambda environmental variable" id = "CKV_AWS_173" - supported_resources = ['AWS::Lambda::Function', "AWS::Serverless::Function"] - categories = [CheckCategories.ENCRYPTION] + supported_resources = ("AWS::Lambda::Function", "AWS::Serverless::Function") + categories = (CheckCategories.ENCRYPTION,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): - properties = conf.get('Properties') + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + properties = conf.get("Properties") if properties is not None: - env = properties.get('Environment') + env = properties.get("Environment") if env is not None: if not isinstance(env, dict): return CheckResult.UNKNOWN - elif env.get('Variables') and not properties.get('KmsKeyArn'): + elif env.get("Variables") and not properties.get("KmsKeyArn"): return CheckResult.FAILED return CheckResult.PASSED - def get_evaluated_keys(self) -> List[str]: - return ['Properties/Environment/Variables', 'Properties/KmsKeyArn'] + def get_evaluated_keys(self) -> list[str]: + return ["Properties/KmsKeyArn"] check = LambdaEnvironmentEncryptionSettings() diff --git a/checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py b/checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py index 9671c89feea..08305f66c69 100644 --- a/checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py +++ b/checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py @@ -1,26 +1,28 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckResult, CheckCategories from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck class EKSPublicAccessCIDR(BaseResourceCheck): - def __init__(self): + def __init__(self) -> None: name = "Ensure Amazon EKS public endpoint not accessible to 0.0.0.0/0" id = "CKV_AWS_38" - supported_resources = ['aws_eks_cluster'] - categories = [CheckCategories.KUBERNETES] + supported_resources = ('aws_eks_cluster',) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: """ Looks for public_access_cidrs at aws_eks_cluster: https://www.terraform.io/docs/providers/aws/r/eks_cluster.html :param conf: aws_eks_cluster configuration :return: """ - self.evaluated_keys = ['vpc_config'] if "vpc_config" in conf.keys(): if "endpoint_public_access" in conf["vpc_config"][0] and not conf["vpc_config"][0]["endpoint_public_access"][0]: - self.evaluated_keys = ['vpc_config/[0]/endpoint_public_access'] return CheckResult.PASSED elif "public_access_cidrs" in conf["vpc_config"][0]: self.evaluated_keys = ['vpc_config/[0]/public_access_cidrs'] diff --git a/checkov/terraform/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py b/checkov/terraform/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py index 838cf2c76ea..8e4d21178a6 100644 --- a/checkov/terraform/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py +++ b/checkov/terraform/checks/resource/aws/LambdaEnvironmentEncryptionSettings.py @@ -18,23 +18,19 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: # check that if I have env vars I have a KMS key if len(conf.get("environment", [])): if "kms_key_arn" in conf: - if conf["kms_key_arn"] == [""]: - self.evaluated_keys = ["environment/kms_key_arn"] + if conf.get("kms_key_arn") == [""]: return CheckResult.FAILED return CheckResult.PASSED - self.evaluated_keys = ["environment"] return CheckResult.FAILED # no env vars so should be no key as that causes state mismatch - if "kms_key_arn" in conf: - if not len(conf["kms_key_arn"]): - return CheckResult.PASSED + if "kms_key_arn" in conf and len(conf["kms_key_arn"]): return CheckResult.FAILED # neither env vars nor kms key return CheckResult.UNKNOWN def get_evaluated_keys(self) -> list[str]: - return ["environment/[0]/variables"] + return ["kms_key_arn"] check = LambdaEnvironmentEncryptionSettings() diff --git a/checkov/terraform/checks/resource/aws/SNSTopicEncryption.py b/checkov/terraform/checks/resource/aws/SNSTopicEncryption.py index 6da116a3ff4..0be8aad6801 100644 --- a/checkov/terraform/checks/resource/aws/SNSTopicEncryption.py +++ b/checkov/terraform/checks/resource/aws/SNSTopicEncryption.py @@ -1,4 +1,4 @@ -from typing import List, Any +from typing import Any from checkov.common.models.enums import CheckCategories from checkov.common.models.consts import ANY_VALUE @@ -16,8 +16,8 @@ def __init__(self) -> None: def get_inspected_key(self) -> str: return "kms_master_key_id" - def get_expected_values(self) -> List[Any]: - return [ANY_VALUE] + def get_expected_value(self) -> Any: + return ANY_VALUE check = SNSTopicEncryption() diff --git a/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py b/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py index fb9d48f1bf4..ba9c329c770 100644 --- a/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py +++ b/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py @@ -1,24 +1,21 @@ from checkov.common.models.enums import CheckResult, CheckCategories -from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceCheck -from typing import List +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from typing import List, Any -class MariaDBPublicAccessDisabled(BaseResourceCheck): - def __init__(self): +class MariaDBPublicAccessDisabled(BaseResourceValueCheck): + def __init__(self) -> None: name = "Ensure 'public network access enabled' is set to 'False' for MariaDB servers" id = "CKV_AZURE_48" - supported_resources = ['azurerm_mariadb_server'] - categories = [CheckCategories.NETWORKING] + supported_resources = ("azurerm_mariadb_server",) + categories = (CheckCategories.NETWORKING,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): - # Whether or not public network access is allowed for this server. Defaults to true. Which is not optimal - if 'public_network_access_enabled' not in conf or conf['public_network_access_enabled'][0]: - return CheckResult.FAILED - return CheckResult.PASSED + def get_inspected_key(self) -> str: + return "public_network_access_enabled" - def get_evaluated_keys(self) -> List[str]: - return ['public_network_access_enabled'] + def get_expected_value(self) -> Any: + return False check = MariaDBPublicAccessDisabled() diff --git a/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py b/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py index b616c95d935..bf34c7cd9b7 100644 --- a/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py +++ b/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py @@ -1,30 +1,22 @@ -from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck -from checkov.common.models.enums import CheckResult, CheckCategories from typing import List +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck + -class GKEClusterLogging(BaseResourceCheck): - def __init__(self): +class GKEClusterLogging(BaseResourceNegativeValueCheck): + def __init__(self) -> None: name = "Ensure Stackdriver Logging is set to Enabled on Kubernetes Engine Clusters" id = "CKV_GCP_1" - supported_resources = ['google_container_cluster'] - categories = [CheckCategories.KUBERNETES] + supported_resources = ("google_container_cluster",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): - """ - Looks for password configuration at azure_instance: - https://www.terraform.io/docs/providers/google/r/compute_ssl_policy.html - :param conf: google_compute_ssl_policy configuration - :return: - """ - if 'logging_service' in conf.keys(): - if conf['logging_service'][0] == "none": - return CheckResult.FAILED - return CheckResult.PASSED + def get_inspected_key(self): + return "logging_service" - def get_evaluated_keys(self) -> List[str]: - return ['logging_service'] + def get_forbidden_values(self): + return "none" check = GKEClusterLogging() From 1e901f21f7ced3e8fc303b88b925d16d340e3a40 Mon Sep 17 00:00:00 2001 From: gruebel Date: Tue, 17 Oct 2023 22:57:02 +0200 Subject: [PATCH 2/2] fix linting --- .../checks/resource/azure/MariaDBPublicAccessDisabled.py | 5 +++-- .../terraform/checks/resource/gcp/GKEClusterLogging.py | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py b/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py index ba9c329c770..551147fe5d8 100644 --- a/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py +++ b/checkov/terraform/checks/resource/azure/MariaDBPublicAccessDisabled.py @@ -1,6 +1,7 @@ -from checkov.common.models.enums import CheckResult, CheckCategories +from typing import Any + +from checkov.common.models.enums import CheckCategories from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck -from typing import List, Any class MariaDBPublicAccessDisabled(BaseResourceValueCheck): diff --git a/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py b/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py index bf34c7cd9b7..09c2f16484b 100644 --- a/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py +++ b/checkov/terraform/checks/resource/gcp/GKEClusterLogging.py @@ -1,6 +1,6 @@ -from typing import List +from typing import Any -from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.common.models.enums import CheckCategories from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck @@ -12,10 +12,10 @@ def __init__(self) -> None: categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def get_inspected_key(self): + def get_inspected_key(self) -> str: return "logging_service" - def get_forbidden_values(self): + def get_forbidden_values(self) -> Any: return "none"