Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enable mypy for kubernetes.checks #5722

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions checkov/common/images/image_referencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import abstractmethod
from collections.abc import Iterable
from pathlib import Path
from typing import cast, Any, TYPE_CHECKING, Generic, TypeVar
from typing import Any, TYPE_CHECKING, Generic, TypeVar

import aiohttp
import docker
Expand Down Expand Up @@ -103,8 +103,8 @@ def inspect(image_name: str) -> str:
image = client.images.get(image_name)
except Exception:
image = client.images.pull(image_name)
return cast(str, image.short_id)
return cast(str, image.short_id)
return image.short_id
return image.short_id
except Exception:
logging.debug(f"failed to pull docker image={image_name}", exc_info=True)
return ""
Expand Down
17 changes: 7 additions & 10 deletions checkov/kubernetes/checks/resource/base_root_container_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Dict, Any, Optional

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check
from checkov.kubernetes.checks.resource.registry import registry

Expand Down Expand Up @@ -33,21 +36,15 @@ def extract_spec(self, conf: Dict[str, Any]) -> Dict[str, Any]:
if "spec" in conf:
spec = conf["spec"]
elif conf['kind'] == 'CronJob':
if "spec" in conf and \
isinstance(conf["spec"], dict) and \
"jobTemplate" in conf["spec"] and \
"spec" in conf["spec"]["jobTemplate"] and \
conf["spec"]["jobTemplate"]["spec"] and \
"template" in conf["spec"]["jobTemplate"]["spec"] and \
"spec" in conf["spec"]["jobTemplate"]["spec"]["template"]:
spec = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec
return spec

@staticmethod
def check_runAsNonRoot(spec):
def check_runAsNonRoot(spec: dict[str, Any]) -> str:
if not isinstance(spec, dict):
return "ABSENT"
security_context = spec.get("securityContext")
Expand Down
7 changes: 0 additions & 7 deletions checkov/kubernetes/checks/resource/base_spec_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,3 @@ def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResul
def scan_spec_conf(self, conf: Dict[str, Any]) -> CheckResult:
"""Return result of Kubernetes object check."""
raise NotImplementedError()

@staticmethod
def get_inner_entry(conf: Dict[str, Any], entry_name: str) -> Dict[str, Any]:
spec = {}
if conf.get("spec") and isinstance(conf["spec"], dict) and conf.get("spec").get("template"):
spec = conf.get("spec").get("template").get(entry_name, {})
return spec
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
else:
return CheckResult.FAILED

return CheckResult.UNKNOWN


check = AllowPrivilegeEscalationPSP()
15 changes: 4 additions & 11 deletions checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


Expand Down Expand Up @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
spec = conf["spec"]
elif conf["kind"] == "CronJob":
spec = conf.get("spec")
if spec:
job_template = spec.get("jobTemplate")
if job_template:
job_template_spec = job_template.get("spec")
if job_template_spec:
template = job_template_spec.get("template")
if template:
if "spec" in template:
spec = template["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec

# Evaluate volumes
Expand Down
2 changes: 2 additions & 0 deletions checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ def scan_container_conf(self, metadata: Dict[str, Any], conf: Dict[str, Any]) ->
return CheckResult.FAILED
return CheckResult.FAILED

return CheckResult.UNKNOWN


check = KubeletClientCa()
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class MinimizeCapabilitiesPSP(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
# CIS-1.5 5.2.9
name = "Minimize the admission of containers with capabilities assigned"
# Location: PodSecurityPolicy.spec.requiredDropCapabilities
id = "CKV_K8S_36"
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
if "requiredDropCapabilities" in conf["spec"]:
if conf["spec"]["requiredDropCapabilities"]:
Expand Down
28 changes: 16 additions & 12 deletions checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class PeerClientCertAuthTrue(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
name = "Ensure that the --peer-client-cert-auth argument is set to true"
id = "CKV_K8S_121"
supported_kind = ['Pod']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("Pod",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf, entity_type=None):
if conf.get("metadata", {}).get('name') == 'etcd':
containers = conf.get('spec')['containers']
for container in containers:
if container.get("args") is not None:
if '--peer-client-cert-auth=true' not in container['args']:
return CheckResult.FAILED
return CheckResult.PASSED
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if conf.get("metadata", {}).get("name") == "etcd":
containers = conf.get("spec", {}).get("containers")
if containers:
for container in containers:
if container.get("args") is not None:
if "--peer-client-cert-auth=true" not in container["args"]:
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN


Expand Down
15 changes: 4 additions & 11 deletions checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.util.data_structures_utils import find_in_dict
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


Expand Down Expand Up @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
spec = conf["spec"]
elif conf["kind"] == "CronJob":
spec = conf.get("spec")
if spec:
job_template = spec.get("jobTemplate")
if job_template:
job_template_spec = job_template.get("spec")
if job_template_spec:
template = job_template_spec.get("template")
if template:
if "spec" in template:
spec = template["spec"]
inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec")
spec = inner_spec if inner_spec else spec
else:
inner_spec = self.get_inner_entry(conf, "spec")
inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec")
spec = inner_spec if inner_spec else spec

if spec and isinstance(spec, dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@


class PrivilegedContainersPSP(BaseSpecOmittedOrValueCheck):

def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.1
# CIS-1.5 5.2.1
name = "Do not admit privileged containers"
id = "CKV_K8S_2"
# Location: PodSecurityPolicy.spec.privileged
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def get_inspected_key(self):
def get_inspected_key(self) -> str:
return "spec/privileged"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


class RbacApproveCertificateSigningRequests(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize ClusterRoles that grant permissions to approve CertificateSigningRequests"
id = "CKV_K8S_156"
supported_entities = ["ClusterRole"]
supported_entities = ("ClusterRole",)
super().__init__(name=name, id=id, supported_entities=supported_entities)

# See https://kubernetes.io/docs/reference/access-authn-authz/certificate-signing-requests/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacBindRoleBindings(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize Roles and ClusterRoles that grant permissions to bind RoleBindings or ClusterRoleBindings"
id = "CKV_K8S_157"
super().__init__(name=name, id=id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacControlWebhooks(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize ClusterRoles that grant control over validating or mutating admission webhook configurations"
id = "CKV_K8S_155"
supported_entities = ["ClusterRole"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class RbacEscalateRoles(BaseRbacK8sCheck):
def __init__(self):
def __init__(self) -> None:
name = "Minimize Roles and ClusterRoles that grant permissions to escalate Roles or ClusterRoles"
id = "CKV_K8S_158"
super().__init__(name=name, id=id)
Expand Down
10 changes: 7 additions & 3 deletions checkov/kubernetes/checks/resource/k8s/RootContainers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult
from checkov.kubernetes.checks.resource.base_root_container_check import BaseK8sRootContainerCheck


class RootContainers(BaseK8sRootContainerCheck):
def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.6
# CIS-1.5 5.2.6
name = "Minimize the admission of root containers"
Expand All @@ -14,12 +18,12 @@ def __init__(self):
id = "CKV_K8S_23"
super().__init__(name=name, id=id)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
spec = self.extract_spec(conf)

# Collect results
if spec and isinstance(spec, dict):
results = {"pod": {}, "container": []}
results: dict[str, Any] = {"pod": {}, "container": []}
results["pod"]["runAsNonRoot"] = self.check_runAsNonRoot(spec)
results["pod"]["runAsUser"] = self.check_runAsUser(spec, 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:

# Collect results
if spec and isinstance(spec, dict):
results = {"pod": {}, "container": []}
results: dict[str, Any] = {"pod": {}, "container": []}
results["pod"]["runAsUser"] = self.check_runAsUser(spec, 10000)

containers = spec.get("containers", [])
Expand Down
13 changes: 8 additions & 5 deletions checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check


class RootContainersPSP(BaseK8Check):

def __init__(self):
def __init__(self) -> None:
# CIS-1.3 1.7.6
# CIS-1.5 5.2.6
name = "Do not admit root containers"
# Location: PodSecurityPolicy.spec.runAsUser.rule
id = "CKV_K8S_6"
supported_kind = ['PodSecurityPolicy']
categories = [CheckCategories.KUBERNETES]
supported_kind = ("PodSecurityPolicy",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind)

def scan_spec_conf(self, conf):
def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if "spec" in conf:
if "runAsUser" in conf["spec"]:
if "rule" in conf["spec"]["runAsUser"]:
Expand Down
27 changes: 13 additions & 14 deletions checkov/kubernetes/checks/resource/k8s/Seccomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,23 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult:
if security_profile:
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED

metadata = self.get_inner_entry(conf, "metadata")
metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata")
if not metadata and "metadata" in conf:
metadata = conf["metadata"]
elif conf['kind'] == 'CronJob':
if "spec" in conf:
if isinstance(conf["spec"], dict) and "jobTemplate" in conf["spec"]:
if "spec" in conf["spec"]["jobTemplate"]:
if conf["spec"]["jobTemplate"]["spec"] and "template" in conf["spec"]["jobTemplate"]["spec"]:
if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]:
metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["metadata"]
elif "spec" in conf["spec"]["jobTemplate"]["spec"]["template"]:
if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]:
metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["metadata"]
elif "securityContext" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]:
security_profile = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["securityContext"].get("seccompProfile", {}).get("type")
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED
inner_template = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template")
if inner_template and isinstance(inner_template, dict):
if "metadata" in inner_template:
metadata = inner_template["metadata"]
elif "spec" in inner_template:
inner_spec = inner_template["spec"]
if "metadata" in inner_spec:
metadata = inner_spec["metadata"]
elif "securityContext" in inner_spec:
security_profile = inner_spec["securityContext"].get("seccompProfile", {}).get("type")
return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED
else:
inner_metadata = self.get_inner_entry(conf, "metadata")
inner_metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata")
metadata = inner_metadata if inner_metadata else metadata

if metadata:
Expand Down
Loading