Skip to content

Commit

Permalink
enable mypy for kubernetes.checks
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel committed Nov 4, 2023
1 parent 852d2d6 commit e4190e6
Show file tree
Hide file tree
Showing 31 changed files with 155 additions and 161 deletions.
6 changes: 3 additions & 3 deletions checkov/common/images/image_referencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import abstractmethod
from collections.abc import Iterable
from pathlib import Path
from typing import cast, Any, TYPE_CHECKING, Generic, TypeVar
from typing import Any, TYPE_CHECKING, Generic, TypeVar

import aiohttp
import docker
Expand Down Expand Up @@ -103,8 +103,8 @@ def inspect(image_name: str) -> str:
image = client.images.get(image_name)
except Exception:
image = client.images.pull(image_name)
return cast(str, image.short_id)
return cast(str, image.short_id)
return image.short_id
return image.short_id
except Exception:
logging.debug(f"failed to pull docker image={image_name}", exc_info=True)
return ""
Expand Down
17 changes: 7 additions & 10 deletions checkov/kubernetes/checks/resource/base_root_container_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Dict, Any, Optional

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check
from checkov.kubernetes.checks.resource.registry import registry

Expand Down Expand Up @@ -33,21 +36,15 @@ def extract_spec(self, conf: Dict[str, Any]) -> Dict[str, Any]:
if "spec" in conf:
spec = conf["spec"]
elif conf['kind'] == 'CronJob':
if "spec" in conf and \
isinstance(conf["spec"], dict) and \
"jobTemplate" in conf["spec"] and \
"spec" in conf["spec"]["jobTemplate"] and \
conf["spec"]["jobTemplate"]["spec"] and \
"template" in conf["spec"]["jobTemplate"]["spec"] and \
"spec" in conf["spec"]["jobTemplate"]["spec"]["template"]:
spec = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec
return spec

@staticmethod
def check_runAsNonRoot(spec):
def check_runAsNonRoot(spec: dict[str, Any]) -> str:
if not isinstance(spec, dict):
return "ABSENT"
security_context = spec.get("securityContext")
Expand Down
7 changes: 0 additions & 7 deletions checkov/kubernetes/checks/resource/base_spec_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,3 @@ def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResul
def scan_spec_conf(self, conf: Dict[str, Any]) -> CheckResult:
"""Return result of Kubernetes object check."""
raise NotImplementedError()

@staticmethod
def get_inner_entry(conf: Dict[str, Any], entry_name: str) -> Dict[str, Any]:
spec = {}
if conf.get("spec") and isinstance(conf["spec"], dict) and conf.get("spec").get("template"):
spec = conf.get("spec").get("template").get(entry_name, {})
return spec
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
else:
return CheckResult.FAILED

return CheckResult.UNKNOWN


check = AllowPrivilegeEscalationPSP()
15 changes: 4 additions & 11 deletions checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


Expand Down Expand Up @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
spec = conf["spec"]
elif conf["kind"] == "CronJob":
spec = conf.get("spec")
if spec:
job_template = spec.get("jobTemplate")
if job_template:
job_template_spec = job_template.get("spec")
if job_template_spec:
template = job_template_spec.get("template")
if template:
if "spec" in template:
spec = template["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec

# Evaluate volumes
Expand Down
2 changes: 2 additions & 0 deletions checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ def scan_container_conf(self, metadata: Dict[str, Any], conf: Dict[str, Any]) ->
return CheckResult.FAILED
return CheckResult.FAILED

return CheckResult.UNKNOWN


check = KubeletClientCa()
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class MinimizeCapabilitiesPSP(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
# CIS-1.5 5.2.9
name = "Minimize the admission of containers with capabilities assigned"
# Location: PodSecurityPolicy.spec.requiredDropCapabilities
id = "CKV_K8S_36"
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
if "requiredDropCapabilities" in conf["spec"]:
if conf["spec"]["requiredDropCapabilities"]:
Expand Down
28 changes: 16 additions & 12 deletions checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class PeerClientCertAuthTrue(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
name = "Ensure that the --peer-client-cert-auth argument is set to true"
id = "CKV_K8S_121"
supported_kind = ['Pod']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("Pod",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf, entity_type=None):
if conf.get("metadata", {}).get('name') == 'etcd':
containers = conf.get('spec')['containers']
for container in containers:
if container.get("args") is not None:
if '--peer-client-cert-auth=true' not in container['args']:
return CheckResult.FAILED
return CheckResult.PASSED
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if conf.get("metadata", {}).get("name") == "etcd":
containers = conf.get("spec",{}).get("containers")
if containers:
for container in containers:
if container.get("args") is not None:
if "--peer-client-cert-auth=true" not in container["args"]:
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN


Expand Down
15 changes: 4 additions & 11 deletions checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


Expand Down Expand Up @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
spec = conf["spec"]
elif conf["kind"] == "CronJob":
spec = conf.get("spec")
if spec:
job_template = spec.get("jobTemplate")
if job_template:
job_template_spec = job_template.get("spec")
if job_template_spec:
template = job_template_spec.get("template")
if template:
if "spec" in template:
spec = template["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec

if spec and isinstance(spec, dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@


class PrivilegedContainersPSP(BaseSpecOmittedOrValueCheck):

def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.1
# CIS-1.5 5.2.1
name = "Do not admit privileged containers"
id = "CKV_K8S_2"
# Location: PodSecurityPolicy.spec.privileged
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def get_inspected_key(self):
def get_inspected_key(self) -> str:
return "spec/privileged"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


class RbacApproveCertificateSigningRequests(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize ClusterRoles that grant permissions to approve CertificateSigningRequests"
id = "CKV_K8S_156"
supported_entities = ["ClusterRole"]
supported_entities = ("ClusterRole",)
super().__init__(name=name, id=id, supported_entities=supported_entities)

# See https://kubernetes.io/docs/reference/access-authn-authz/certificate-signing-requests/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacBindRoleBindings(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize Roles and ClusterRoles that grant permissions to bind RoleBindings or ClusterRoleBindings"
id = "CKV_K8S_157"
super().__init__(name=name, id=id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacControlWebhooks(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize ClusterRoles that grant control over validating or mutating admission webhook configurations"
id = "CKV_K8S_155"
supported_entities = ["ClusterRole"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacEscalateRoles(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize Roles and ClusterRoles that grant permissions to escalate Roles or ClusterRoles"
id = "CKV_K8S_158"
super().__init__(name=name, id=id)
Expand Down
10 changes: 7 additions & 3 deletions checkov/kubernetes/checks/resource/k8s/RootContainers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult
from checkov.kubernetes.checks.resource.base_root_container_check import BaseK8sRootContainerCheck


class RootContainers(BaseK8sRootContainerCheck):
def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.6
# CIS-1.5 5.2.6
name = "Minimize the admission of root containers"
Expand All @@ -14,12 +18,12 @@ def __init__(self):
id = "CKV_K8S_23"
super().__init__(name=name, id=id)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
spec = self.extract_spec(conf)

# Collect results
if spec and isinstance(spec, dict):
results = {"pod": {}, "container": []}
results: dict[str, Any] = {"pod": {}, "container": []}
results["pod"]["runAsNonRoot"] = self.check_runAsNonRoot(spec)
results["pod"]["runAsUser"] = self.check_runAsUser(spec, 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:

# Collect results
if spec and isinstance(spec, dict):
results = {"pod": {}, "container": []}
results: dict[str, Any] = {"pod": {}, "container": []}
results["pod"]["runAsUser"] = self.check_runAsUser(spec, 10000)

containers = spec.get("containers", [])
Expand Down
13 changes: 8 additions & 5 deletions checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class RootContainersPSP(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.6
# CIS-1.5 5.2.6
name = "Do not admit root containers"
# Location: PodSecurityPolicy.spec.runAsUser.rule
id = "CKV_K8S_6"
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
if "runAsUser" in conf["spec"]:
if "rule" in conf["spec"]["runAsUser"]:
Expand Down
27 changes: 13 additions & 14 deletions checkov/kubernetes/checks/resource/k8s/Seccomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,23 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if security_profile:
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED

metadata = self.get_inner_entry(conf, "metadata")
metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata")
if not metadata and "metadata" in conf:
metadata = conf["metadata"]
elif conf['kind'] == 'CronJob':
if "spec" in conf:
if isinstance(conf["spec"], dict) and "jobTemplate" in conf["spec"]:
if "spec" in conf["spec"]["jobTemplate"]:
if conf["spec"]["jobTemplate"]["spec"] and "template" in conf["spec"]["jobTemplate"]["spec"]:
if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]:
metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["metadata"]
elif "spec" in conf["spec"]["jobTemplate"]["spec"]["template"]:
if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]:
metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["metadata"]
elif "securityContext" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]:
security_profile = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["securityContext"].get("seccompProfile", {}).get("type")
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED
inner_template = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template")
if inner_template and isinstance(inner_template, dict):
if "metadata" in inner_template:
metadata = inner_template["metadata"]
elif "spec" in inner_template:
inner_spec = inner_template["spec"]
if "metadata" in inner_spec:
metadata = inner_spec["metadata"]
elif "securityContext" in inner_spec:
security_profile = inner_spec["securityContext"].get("seccompProfile", {}).get("type")
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED
else:
inner_metadata = self.get_inner_entry(conf, "metadata")
inner_metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata")
metadata = inner_metadata if inner_metadata else metadata

if metadata:
Expand Down
Loading

0 comments on commit e4190e6

Please sign in to comment.