Skip to content

Commit

Permalink
Merge branch 'main' into feat/bigtabledeletion
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel authored Nov 2, 2023
2 parents c705c5d + 09c060a commit 5df842a
Show file tree
Hide file tree
Showing 27 changed files with 1,858 additions and 1,842 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.arm.base_parameter_check import BaseParameterCheck


class SecureStringParameterNoHardcodedValue(BaseParameterCheck):
def __init__(self):
def __init__(self) -> None:
name = "SecureString parameter should not have hardcoded default values"
id = "CKV_AZURE_131"
supported_resources = ['secureString']
categories = [CheckCategories.SECRETS]
supported_resources = ('secureString',)
categories = (CheckCategories.SECRETS,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
# https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/test-cases#secure-parameters-cant-have-hardcoded-default
default_value = conf.get('defaultValue')
if default_value: # should be missing, or an empty string
Expand Down
21 changes: 14 additions & 7 deletions checkov/arm/checks/resource/APIServicesUseVirtualNetwork.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
from typing import Any

from checkov.common.models.consts import ANY_VALUE
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class APIServicesUseVirtualNetwork(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that API management services use virtual networks"
id = "CKV_AZURE_107"
supported_resources = ['Microsoft.ApiManagement/service']
categories = [CheckCategories.NETWORKING]
super().__init__(name=name, id=id, categories=categories,
supported_resources=supported_resources, missing_block_result=CheckResult.FAILED)
supported_resources = ("Microsoft.ApiManagement/service",)
categories = (CheckCategories.NETWORKING,)
super().__init__(
name=name,
id=id,
categories=categories,
supported_resources=supported_resources,
missing_block_result=CheckResult.FAILED,
)

def get_inspected_key(self):
def get_inspected_key(self) -> str:
return "properties/virtualNetworkConfiguration"

def get_expected_value(self):
def get_expected_value(self) -> Any:
return ANY_VALUE


Expand Down
27 changes: 13 additions & 14 deletions checkov/arm/checks/resource/AppServiceHttps20Enabled.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.arm.base_resource_check import BaseResourceCheck
from typing import Any

# https://docs.microsoft.com/en-us/azure/templates/microsoft.web/2019-08-01/sites
from checkov.arm.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories


class AppServiceHttps20Enabled(BaseResourceCheck):
def __init__(self):
class AppServiceHttps20Enabled(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure that 'HTTP Version' is the latest if used to run the web app"
id = "CKV_AZURE_18"
supported_resources = ["Microsoft.Web/sites"]
categories = [CheckCategories.NETWORKING]
supported_resources = ("Microsoft.Web/sites",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
properties = conf.get("properties")
if isinstance(properties, dict):
site_config = properties.get("siteConfig")
if isinstance(site_config, dict) and site_config.get("http20Enabled"):
return CheckResult.PASSED
return CheckResult.FAILED
def get_inspected_key(self) -> str:
# https://docs.microsoft.com/en-us/azure/templates/microsoft.web/2019-08-01/sites
return "properties/siteConfig/http20Enabled"

def get_expected_value(self) -> Any:
return "true"


check = AppServiceHttps20Enabled()
27 changes: 13 additions & 14 deletions checkov/arm/checks/resource/AppServiceMinTLSVersion.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.arm.base_resource_check import BaseResourceCheck
from typing import Any

# https://docs.microsoft.com/en-us/azure/templates/microsoft.web/2019-08-01/sites
from checkov.arm.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories


class AppServiceMinTLSVersion(BaseResourceCheck):
def __init__(self):
class AppServiceMinTLSVersion(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure web app is using the latest version of TLS encryption"
id = "CKV_AZURE_15"
supported_resources = ["Microsoft.Web/sites"]
categories = [CheckCategories.NETWORKING]
supported_resources = ("Microsoft.Web/sites",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
properties = conf.get("properties")
if isinstance(properties, dict):
site_config = properties.get("siteConfig")
if isinstance(site_config, dict) and site_config.get("minTlsVersion") == "1.2":
return CheckResult.PASSED
return CheckResult.FAILED
def get_inspected_key(self) -> str:
# https://docs.microsoft.com/en-us/azure/templates/microsoft.web/2019-08-01/sites
return "properties/siteConfig/minTlsVersion"

def get_expected_value(self) -> Any:
return "1.2"


check = AppServiceMinTLSVersion()
20 changes: 12 additions & 8 deletions checkov/arm/checks/resource/AzureManagedDiskEncryptionSet.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from typing import Any

from checkov.common.models.consts import ANY_VALUE
from checkov.common.models.enums import CheckCategories
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class AzureManagedDiskEncryptionSet(BaseResourceValueCheck):
def __init__(self):
name = "Ensure that managed disks use a specific set of disk encryption sets for the " \
"customer-managed key encryption"
def __init__(self) -> None:
name = (
"Ensure that managed disks use a specific set of disk encryption sets for the "
"customer-managed key encryption"
)
id = "CKV_AZURE_93"
supported_resources = ['Microsoft.Compute/disks']
categories = [CheckCategories.ENCRYPTION]
supported_resources = ("Microsoft.Compute/disks",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return 'properties/encryption/diskEncryptionSetId'
def get_inspected_key(self) -> str:
return "properties/encryption/diskEncryptionSetId"

def get_expected_value(self):
def get_expected_value(self) -> Any:
return ANY_VALUE


Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class CognitiveServicesDisablesPublicNetwork(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that Cognitive Services accounts disable public network access"
id = "CKV_AZURE_134"
supported_resources = ['Microsoft.CognitiveServices/accounts']
categories = [CheckCategories.NETWORKING]
supported_resources = ("Microsoft.CognitiveServices/accounts",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return 'properties/publicNetworkAccess'
def get_inspected_key(self) -> str:
return "properties/publicNetworkAccess"

def get_expected_value(self):
def get_expected_value(self) -> Any:
return "Disabled"


Expand Down
14 changes: 8 additions & 6 deletions checkov/arm/checks/resource/CosmosDBHaveCMK.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from typing import Any

from checkov.common.models.consts import ANY_VALUE
from checkov.common.models.enums import CheckCategories
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class CosmosDBHaveCMK(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that Cosmos DB accounts have customer-managed keys to encrypt data at rest"
id = "CKV_AZURE_100"
supported_resources = ['Microsoft.DocumentDb/databaseAccounts']
categories = [CheckCategories.NETWORKING]
supported_resources = ("Microsoft.DocumentDb/databaseAccounts",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return 'properties/keyVaultKeyUri'
def get_inspected_key(self) -> str:
return "properties/keyVaultKeyUri"

def get_expected_value(self):
def get_expected_value(self) -> Any:
return ANY_VALUE


Expand Down
22 changes: 13 additions & 9 deletions checkov/arm/checks/resource/KeyBackedByHSM.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class KeyBackedByHSM(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that key vault key is backed by HSM"
id = "CKV_AZURE_112"
supported_resources = ['Microsoft.KeyVault/vaults/keys']
categories = [CheckCategories.BACKUP_AND_RECOVERY]
supported_resources = ("Microsoft.KeyVault/vaults/keys",)
categories = (CheckCategories.BACKUP_AND_RECOVERY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return 'properties/kty'
def get_inspected_key(self) -> str:
return "properties/kty"

def get_expected_value(self):
return 'RSA-HSM'
def get_expected_value(self) -> Any:
return "RSA-HSM"

def get_expected_values(self):
return [self.get_expected_value(), 'EC-HSM']
def get_expected_values(self) -> list[Any]:
return [self.get_expected_value(), "EC-HSM"]


check = KeyBackedByHSM()
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.arm.base_resource_value_check import BaseResourceValueCheck


class KeyVaultEnablesFirewallRulesSettings(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that key vault allows firewall rules settings"
id = "CKV_AZURE_109"
supported_resources = ['Microsoft.KeyVault/vaults']
categories = [CheckCategories.NETWORKING]
supported_resources = ("Microsoft.KeyVault/vaults",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
def get_inspected_key(self) -> str:
return "properties/networkAcls/defaultAction"

def get_expected_value(self):
def get_expected_value(self) -> Any:
return "Deny"


Expand Down
10 changes: 5 additions & 5 deletions checkov/arm/checks/resource/MySQLGeoBackupEnabled.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@


class MySQLGeoBackupEnabled(BaseResourceValueCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure that My SQL server enables geo-redundant backups"
id = "CKV_AZURE_94"
supported_resources = ['Microsoft.DBforMySQL/flexibleServers']
categories = [CheckCategories.BACKUP_AND_RECOVERY]
supported_resources = ("Microsoft.DBforMySQL/flexibleServers",)
categories = (CheckCategories.BACKUP_AND_RECOVERY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return 'properties/Backup/geoRedundantBackup'
def get_inspected_key(self) -> str:
return "properties/Backup/geoRedundantBackup"


check = MySQLGeoBackupEnabled()
5 changes: 3 additions & 2 deletions checkov/arm/checks/resource/NSGRulePortAccessRestricted.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def __init__(self, name: str, check_id: str, port: int) -> None:
self.port = port

def is_port_in_range(self, port_range: Union[int, str]) -> bool:
if re.match(PORT_RANGE, str(port_range)):
start, end = int(port_range.split("-")[0]), int(port_range.split("-")[1])
port_range_str = str(port_range)
if re.match(PORT_RANGE, port_range_str):
start, end = int(port_range_str.split("-")[0]), int(port_range_str.split("-")[1])
if start <= self.port <= end:
return True
if port_range in (str(self.port), "*"):
Expand Down
6 changes: 4 additions & 2 deletions checkov/arm/checks/resource/NSGRuleSSHAccessRestricted.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@


class NSGRuleSSHAccessRestricted(NSGRulePortAccessRestricted):
def __init__(self):
super().__init__(name="Ensure that SSH access is restricted from the internet", check_id="CKV_AZURE_10", port=22)
def __init__(self) -> None:
super().__init__(
name="Ensure that SSH access is restricted from the internet", check_id="CKV_AZURE_10", port=22
)


check = NSGRuleSSHAccessRestricted()
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
conf["properties"]["value"].lower() == "on":
return CheckResult.PASSED
return CheckResult.FAILED
# If name not connection_throttling - don't report (neither pass nor fail)
elif conf["type"] == "configurations":
if "name" in conf and conf["name"] == "connection_throttling":
if "parent_type" in conf:
Expand All @@ -35,9 +34,11 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
conf["properties"]["value"].lower() == "on":
return CheckResult.PASSED
return CheckResult.FAILED
# If name not connection_throttling - don't report (neither pass nor fail)
else:
return CheckResult.FAILED

# If name not connection_throttling - don't report (neither pass nor fail)
return CheckResult.UNKNOWN


check = PostgreSQLServerConnectionThrottlingEnabled()
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
conf["properties"]["value"].lower() == "on":
return CheckResult.PASSED
return CheckResult.FAILED
# If name not connection_throttling - don't report (neither pass nor fail)
elif conf["type"] == "configurations":
if "name" in conf and conf["name"] == "log_checkpoints":
if "parent_type" in conf:
Expand All @@ -36,9 +35,11 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
conf["properties"]["value"].lower() == "on":
return CheckResult.PASSED
return CheckResult.FAILED
# If name not connection_throttling - don't report (neither pass nor fail)
else:
return CheckResult.FAILED

# If name not connection_throttling - don't report (neither pass nor fail)
return CheckResult.UNKNOWN


check = PostgreSQLServerLogCheckpointsEnabled()
Loading

0 comments on commit 5df842a

Please sign in to comment.