Skip to content

Commit

Permalink
feat(terraform): Add new checks to match run checks (#6868)
Browse files Browse the repository at this point in the history
* Add check for 424a5e77-8997-47d9-b0e0-daaca8b81b01

* Add check for d480c1d2-06b3-4e53-81c9-a21ed83cb5fc

* Add egress check

* Fix test

* Adjust tests

* Add new NSG check

* Revert "Add new NSG check"

This reverts commit 1be3da1.

* New check

* Fix ID
  • Loading branch information
tsmithv11 authored Nov 27, 2024
1 parent 5c9ec9e commit a3c3a28
Show file tree
Hide file tree
Showing 17 changed files with 596 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
metadata:
id: "CKV2_AZURE_55"
name: "Ensure Azure Spring Cloud app end-to-end TLS is enabled"
category: "NETWORKING"
definition:
or:
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_service
attribute: sku_tier
operator: not_exists
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_service
attribute: sku_tier
operator: equals
value: "Basic"
- and:
- cond_type: filter
attribute: resource_type
value:
- azurerm_spring_cloud_service
operator: within
- or:
- resource_types:
- azurerm_spring_cloud_service
connected_resource_types:
- azurerm_spring_cloud_app
operator: not_exists
cond_type: connection
- and:
- resource_types:
- azurerm_spring_cloud_service
connected_resource_types:
- azurerm_spring_cloud_app
operator: exists
cond_type: connection
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_app
attribute: tls_enabled
operator: exists
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_app
attribute: tls_enabled
operator: is_true
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.util.type_forcers import force_list
from checkov.common.util.type_forcers import force_int


class AbsSecurityGroupUnrestrictedEgress(BaseResourceCheck):
def __init__(self, check_id: str, port: int) -> None:
name = f"Ensure no security groups allow egress from 0.0.0.0:0 to port {port}"
supported_resources = ('aws_security_group', 'aws_security_group_rule', 'aws_vpc_security_group_egress_rule')
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=check_id, categories=categories, supported_resources=supported_resources)
self.port = port

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
"""
Looks for configuration at security group egress rules :
https://www.terraform.io/docs/providers/aws/r/security_group.html
https://www.terraform.io/docs/providers/aws/r/security_group_rule.html
Return PASS if:
- The resource is an aws_security_group that contains no violating egress rules (including if there are no
egress rules at all), OR
- The resource is an aws_security_group_rule of type 'egress' that does not violate the check.
Return FAIL if:
- The resource is an aws_security_group that contains a violating egress rule, OR
- The resource is an aws_security_group_rule of type 'egress' that violates the check.
Return UNKNOWN if:
- the resource is an aws_security_group_rule of type 'egress', OR
:param conf: aws_security_group configuration
:return: <CheckResult>
"""

if 'egress' in conf: # This means it's an SG resource with egress block(s)
egress_conf = conf['egress']
for egress_rule in egress_conf:
for rule in force_list(egress_rule):
if isinstance(rule, dict):
if self.check_self(rule):
return CheckResult.PASSED
if self.contains_violation(rule):
self.evaluated_keys = [
f'egress/[{egress_conf.index(egress_rule)}]/from_port',
f'egress/[{egress_conf.index(egress_rule)}]/to_port',
f'egress/[{egress_conf.index(egress_rule)}]/cidr_blocks',
f'egress/[{egress_conf.index(egress_rule)}]/ipv6_cidr_blocks',
]
return CheckResult.FAILED

return CheckResult.PASSED

if 'type' in conf: # This means it's an SG_rule resource.
type = force_list(conf['type'])[0]
if type == 'egress':
if self.check_self(conf):
return CheckResult.PASSED
self.evaluated_keys = ['from_port', 'to_port', 'cidr_blocks', 'ipv6_cidr_blocks']
if self.contains_violation(conf):
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN
else:
self.evaluated_keys = ['from_port', 'to_port', 'cidr_ipv4', 'cidr_ipv6']
if 'from_port' in conf or 'to_port' in conf:
if self.contains_violation(conf):
return CheckResult.FAILED
return CheckResult.PASSED

return CheckResult.PASSED

def contains_violation(self, conf: dict[str, list[Any]]) -> bool:
from_port = force_int(force_list(conf.get('from_port', [{-1}]))[0])
to_port = force_int(force_list(conf.get('to_port', [{-1}]))[0])
protocol = force_list(conf.get('protocol', [None]))[0]
if from_port == 0 and to_port == 0:
to_port = 65535

prefix_list_ids = conf.get('prefix_list_ids')
if prefix_list_ids and prefix_list_ids != [[]]:
return False

if from_port is not None and to_port is not None and (from_port <= self.port <= to_port) or (
protocol == '-1' and from_port == 0 and to_port == 65535):
if conf.get('cidr_blocks'):
conf_cidr_blocks = conf.get('cidr_blocks', [[]])
else:
conf_cidr_blocks = conf.get('cidr_ipv4', [[]])
if conf_cidr_blocks and len(conf_cidr_blocks) > 0:
conf_cidr_blocks = conf_cidr_blocks[0]
cidr_blocks = force_list(conf_cidr_blocks)
if "0.0.0.0/0" in cidr_blocks:
return True
if conf.get('ipv6_cidr_blocks'):
ipv6_cidr_blocks = conf.get('ipv6_cidr_blocks', [])
else:
ipv6_cidr_blocks = conf.get('cidr_ipv6', [])
if ipv6_cidr_blocks and ipv6_cidr_blocks[0] is not None and \
any(ip in ['::/0', '0000:0000:0000:0000:0000:0000:0000:0000/0'] for ip in ipv6_cidr_blocks[0]):
return True
if not ipv6_cidr_blocks and not cidr_blocks \
and conf.get('security_groups') is None \
and conf.get('source_security_group_id') is None:
return True
return False

def check_self(self, conf: dict[str, list[Any]]) -> bool:
if conf.get('self'):
limit = force_list(conf['self'])[0]
if limit:
return True
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from checkov.terraform.checks.resource.aws.AbsSecurityGroupUnrestrictedEgress import\
AbsSecurityGroupUnrestrictedEgress


class SecurityGroupUnrestrictedEgressAll(AbsSecurityGroupUnrestrictedEgress):
def __init__(self):
super().__init__(check_id="CKV_AWS_382", port=-1)


check = SecurityGroupUnrestrictedEgressAll()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck


class AzureContainerInstancePublicIPAddressType(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure that Azure Container group is deployed into virtual network"
id = "CKV_AZURE_245"
supported_resources = ('azurerm_container_group',)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return 'ip_address_type'

def get_expected_values(self) -> List[str]:
return ['Private', 'None']


check = AzureContainerInstancePublicIPAddressType()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List, Any

from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck
from checkov.common.models.enums import CheckCategories


class KubernetesClusterHTTPApplicationRouting(BaseResourceNegativeValueCheck):
def __init__(self) -> None:
name = "Ensure Azure AKS cluster HTTP application routing is disabled"
id = "CKV_AZURE_246"
supported_resources = ('azurerm_kubernetes_cluster',)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "http_application_routing_enabled"

def get_forbidden_values(self) -> List[Any]:
return [True]


check = KubernetesClusterHTTPApplicationRouting()
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
resource "aws_security_group" "pass" {
name = "example"
vpc_id = "aws_vpc.example.id"

ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
}
ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 443
to_port = 443
protocol = "tcp"
}
egress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 20
to_port = 200
protocol = "-1"
}
}

resource "aws_security_group" "fail2" {
name = "example"
vpc_id = "aws_vpc.example.id"

ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
}
ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 443
to_port = 443
protocol = "tcp"
}
egress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 0
to_port = 0
protocol = "-1"
}
}

resource "aws_security_group_rule" "pass" {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
security_group_id = "sg-12345"
type = "egress"
}

resource "aws_vpc_security_group_egress_rule" "pass" {
security_group_id = aws_security_group.example.id

cidr_ipv4 = "0.0.0.0/0"
from_port = 80
ip_protocol = "tcp"
to_port = 80
}

# fail
resource "aws_security_group" "fail" {
name = "allow-all-ingress"
description = "unfettered access"
vpc_id = "test_vpc"

egress {
from_port = -1
to_port = -1
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
description = "Test unfettered access"
}
}


resource "aws_security_group_rule" "fail" {
cidr_blocks = ["0.0.0.0/0"]
from_port = -1
to_port = -1
protocol = "tcp"
security_group_id = "sg-12345"
description = "Test unfettered access"
type = "egress"
}

resource "aws_security_group_rule" "fail2" {
cidr_blocks = ["0.0.0.0/0"]
from_port = 0
to_port = 0
protocol = "-1"
security_group_id = "sg-123456"
description = "Test unfettered access"
type = "egress"
}

resource "aws_vpc_security_group_egress_rule" "fail" {
security_group_id = aws_security_group.example.id

cidr_ipv4 = "0.0.0.0/0"
from_port = -1
ip_protocol = "tcp"
to_port = -1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.SecurityGroupUnrestrictedEgressAny import check
from checkov.terraform.runner import Runner


class TestSecurityGroupUnrestrictedEgressAny(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_SecurityGroupUnrestrictedEgressAny"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_security_group.pass",
"aws_security_group_rule.pass",
"aws_vpc_security_group_egress_rule.pass"
}

failing_resources = {
"aws_security_group.fail2",
"aws_security_group.fail",
"aws_security_group_rule.fail",
"aws_vpc_security_group_egress_rule.fail",
"aws_security_group_rule.fail2"
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], len(passing_resources))
self.assertEqual(summary["failed"], len(failing_resources))
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit a3c3a28

Please sign in to comment.