From e4190e6a5c6abed31d17d383b532eef667a5456b Mon Sep 17 00:00:00 2001 From: gruebel Date: Sat, 4 Nov 2023 23:00:18 +0100 Subject: [PATCH 1/2] enable mypy for kubernetes.checks --- checkov/common/images/image_referencer.py | 6 ++-- .../resource/base_root_container_check.py | 17 +++++------ .../checks/resource/base_spec_check.py | 7 ----- .../k8s/AllowPrivilegeEscalationPSP.py | 2 ++ .../checks/resource/k8s/DockerSocketVolume.py | 15 +++------- .../checks/resource/k8s/KubeletClientCa.py | 2 ++ .../resource/k8s/MinimizeCapabilitiesPSP.py | 13 +++++---- .../resource/k8s/PeerClientCertAuthTrue.py | 28 +++++++++++-------- .../checks/resource/k8s/PodSecurityContext.py | 15 +++------- .../resource/k8s/PrivilegedContainersPSP.py | 9 +++--- .../RbacApproveCertificateSigningRequests.py | 4 +-- .../resource/k8s/RbacBindRoleBindings.py | 2 +- .../resource/k8s/RbacControlWebhooks.py | 2 +- .../checks/resource/k8s/RbacEscalateRoles.py | 2 +- .../checks/resource/k8s/RootContainers.py | 10 +++++-- .../resource/k8s/RootContainersHighUID.py | 2 +- .../checks/resource/k8s/RootContainersPSP.py | 13 +++++---- .../kubernetes/checks/resource/k8s/Seccomp.py | 27 +++++++++--------- .../checks/resource/k8s/SeccompPSP.py | 13 +++++---- .../resource/k8s/ServiceAccountTokens.py | 15 +++------- .../checks/resource/k8s/ShareHostIPC.py | 15 +++------- .../checks/resource/k8s/ShareHostIPCPSP.py | 9 +++--- .../checks/resource/k8s/ShareHostPID.py | 15 +++------- .../checks/resource/k8s/ShareHostPIDPSP.py | 9 +++--- .../k8s/SharedHostNetworkNamespace.py | 15 +++------- .../k8s/SharedHostNetworkNamespacePSP.py | 9 +++--- .../checks/resource/k8s/WildcardRoles.py | 3 +- extra_stubs/docker/__init__.pyi | 5 ++++ extra_stubs/docker/client.pyi | 13 +++++++++ extra_stubs/docker/models/images.pyi | 14 ++++++++++ mypy.ini | 5 +--- 31 files changed, 155 insertions(+), 161 deletions(-) create mode 100644 extra_stubs/docker/__init__.pyi create mode 100644 extra_stubs/docker/client.pyi create mode 100644 extra_stubs/docker/models/images.pyi diff --git a/checkov/common/images/image_referencer.py b/checkov/common/images/image_referencer.py index b7809b1a1c8..96383184c78 100644 --- a/checkov/common/images/image_referencer.py +++ b/checkov/common/images/image_referencer.py @@ -5,7 +5,7 @@ from abc import abstractmethod from collections.abc import Iterable from pathlib import Path -from typing import cast, Any, TYPE_CHECKING, Generic, TypeVar +from typing import Any, TYPE_CHECKING, Generic, TypeVar import aiohttp import docker @@ -103,8 +103,8 @@ def inspect(image_name: str) -> str: image = client.images.get(image_name) except Exception: image = client.images.pull(image_name) - return cast(str, image.short_id) - return cast(str, image.short_id) + return image.short_id + return image.short_id except Exception: logging.debug(f"failed to pull docker image={image_name}", exc_info=True) return "" diff --git a/checkov/kubernetes/checks/resource/base_root_container_check.py b/checkov/kubernetes/checks/resource/base_root_container_check.py index 4918c8822c2..50d2874cfcf 100644 --- a/checkov/kubernetes/checks/resource/base_root_container_check.py +++ b/checkov/kubernetes/checks/resource/base_root_container_check.py @@ -1,7 +1,10 @@ +from __future__ import annotations + from abc import abstractmethod from typing import Dict, Any, Optional from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check from checkov.kubernetes.checks.resource.registry import registry @@ -33,21 +36,15 @@ def extract_spec(self, conf: Dict[str, Any]) -> Dict[str, Any]: if "spec" in conf: spec = conf["spec"] elif conf['kind'] == 'CronJob': - if "spec" in conf and \ - isinstance(conf["spec"], dict) and \ - "jobTemplate" in conf["spec"] and \ - "spec" in conf["spec"]["jobTemplate"] and \ - conf["spec"]["jobTemplate"]["spec"] and \ - "template" in conf["spec"]["jobTemplate"]["spec"] and \ - "spec" in conf["spec"]["jobTemplate"]["spec"]["template"]: - spec = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec return spec @staticmethod - def check_runAsNonRoot(spec): + def check_runAsNonRoot(spec: dict[str, Any]) -> str: if not isinstance(spec, dict): return "ABSENT" security_context = spec.get("securityContext") diff --git a/checkov/kubernetes/checks/resource/base_spec_check.py b/checkov/kubernetes/checks/resource/base_spec_check.py index 45d2b978538..808d33832e5 100644 --- a/checkov/kubernetes/checks/resource/base_spec_check.py +++ b/checkov/kubernetes/checks/resource/base_spec_check.py @@ -35,10 +35,3 @@ def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResul def scan_spec_conf(self, conf: Dict[str, Any]) -> CheckResult: """Return result of Kubernetes object check.""" raise NotImplementedError() - - @staticmethod - def get_inner_entry(conf: Dict[str, Any], entry_name: str) -> Dict[str, Any]: - spec = {} - if conf.get("spec") and isinstance(conf["spec"], dict) and conf.get("spec").get("template"): - spec = conf.get("spec").get("template").get(entry_name, {}) - return spec diff --git a/checkov/kubernetes/checks/resource/k8s/AllowPrivilegeEscalationPSP.py b/checkov/kubernetes/checks/resource/k8s/AllowPrivilegeEscalationPSP.py index e2400573f05..dbb9b588741 100644 --- a/checkov/kubernetes/checks/resource/k8s/AllowPrivilegeEscalationPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/AllowPrivilegeEscalationPSP.py @@ -33,5 +33,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: else: return CheckResult.FAILED + return CheckResult.UNKNOWN + check = AllowPrivilegeEscalationPSP() diff --git a/checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py b/checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py index 879a8b30fae..73e6998535a 100644 --- a/checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py +++ b/checkov/kubernetes/checks/resource/k8s/DockerSocketVolume.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec # Evaluate volumes diff --git a/checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py b/checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py index e4b194d68e9..ddd5ba7a72d 100644 --- a/checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py +++ b/checkov/kubernetes/checks/resource/k8s/KubeletClientCa.py @@ -23,5 +23,7 @@ def scan_container_conf(self, metadata: Dict[str, Any], conf: Dict[str, Any]) -> return CheckResult.FAILED return CheckResult.FAILED + return CheckResult.UNKNOWN + check = KubeletClientCa() diff --git a/checkov/kubernetes/checks/resource/k8s/MinimizeCapabilitiesPSP.py b/checkov/kubernetes/checks/resource/k8s/MinimizeCapabilitiesPSP.py index e64498c05e2..d604e05a22c 100644 --- a/checkov/kubernetes/checks/resource/k8s/MinimizeCapabilitiesPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/MinimizeCapabilitiesPSP.py @@ -1,19 +1,22 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckCategories, CheckResult from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check class MinimizeCapabilitiesPSP(BaseK8Check): - - def __init__(self): + def __init__(self) -> None: # CIS-1.5 5.2.9 name = "Minimize the admission of containers with capabilities assigned" # Location: PodSecurityPolicy.spec.requiredDropCapabilities id = "CKV_K8S_36" - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def scan_spec_conf(self, conf): + def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: if "requiredDropCapabilities" in conf["spec"]: if conf["spec"]["requiredDropCapabilities"]: diff --git a/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py b/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py index df24123f7e3..347afb0c80c 100644 --- a/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py +++ b/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py @@ -1,24 +1,28 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckCategories, CheckResult from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check class PeerClientCertAuthTrue(BaseK8Check): - - def __init__(self): + def __init__(self) -> None: name = "Ensure that the --peer-client-cert-auth argument is set to true" id = "CKV_K8S_121" - supported_kind = ['Pod'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("Pod",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def scan_spec_conf(self, conf, entity_type=None): - if conf.get("metadata", {}).get('name') == 'etcd': - containers = conf.get('spec')['containers'] - for container in containers: - if container.get("args") is not None: - if '--peer-client-cert-auth=true' not in container['args']: - return CheckResult.FAILED - return CheckResult.PASSED + def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: + if conf.get("metadata", {}).get("name") == "etcd": + containers = conf.get("spec",{}).get("containers") + if containers: + for container in containers: + if container.get("args") is not None: + if "--peer-client-cert-auth=true" not in container["args"]: + return CheckResult.FAILED + return CheckResult.PASSED return CheckResult.UNKNOWN diff --git a/checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py b/checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py index d9524fb7696..5e8bfa1a7b9 100644 --- a/checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py +++ b/checkov/kubernetes/checks/resource/k8s/PodSecurityContext.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec if spec and isinstance(spec, dict): diff --git a/checkov/kubernetes/checks/resource/k8s/PrivilegedContainersPSP.py b/checkov/kubernetes/checks/resource/k8s/PrivilegedContainersPSP.py index 036214161c6..b73895412d9 100644 --- a/checkov/kubernetes/checks/resource/k8s/PrivilegedContainersPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/PrivilegedContainersPSP.py @@ -3,18 +3,17 @@ class PrivilegedContainersPSP(BaseSpecOmittedOrValueCheck): - - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.1 # CIS-1.5 5.2.1 name = "Do not admit privileged containers" id = "CKV_K8S_2" # Location: PodSecurityPolicy.spec.privileged - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def get_inspected_key(self): + def get_inspected_key(self) -> str: return "spec/privileged" diff --git a/checkov/kubernetes/checks/resource/k8s/RbacApproveCertificateSigningRequests.py b/checkov/kubernetes/checks/resource/k8s/RbacApproveCertificateSigningRequests.py index dd38227d562..fba90cc5ebc 100644 --- a/checkov/kubernetes/checks/resource/k8s/RbacApproveCertificateSigningRequests.py +++ b/checkov/kubernetes/checks/resource/k8s/RbacApproveCertificateSigningRequests.py @@ -2,10 +2,10 @@ class RbacApproveCertificateSigningRequests(BaseRbacK8sCheck): - def __init__(self): + def __init__(self) -> None: name = "Minimize ClusterRoles that grant permissions to approve CertificateSigningRequests" id = "CKV_K8S_156" - supported_entities = ["ClusterRole"] + supported_entities = ("ClusterRole",) super().__init__(name=name, id=id, supported_entities=supported_entities) # See https://kubernetes.io/docs/reference/access-authn-authz/certificate-signing-requests/ diff --git a/checkov/kubernetes/checks/resource/k8s/RbacBindRoleBindings.py b/checkov/kubernetes/checks/resource/k8s/RbacBindRoleBindings.py index 90a055ca25c..7d137789c32 100644 --- a/checkov/kubernetes/checks/resource/k8s/RbacBindRoleBindings.py +++ b/checkov/kubernetes/checks/resource/k8s/RbacBindRoleBindings.py @@ -2,7 +2,7 @@ class RbacBindRoleBindings(BaseRbacK8sCheck): - def __init__(self): + def __init__(self) -> None: name = "Minimize Roles and ClusterRoles that grant permissions to bind RoleBindings or ClusterRoleBindings" id = "CKV_K8S_157" super().__init__(name=name, id=id) diff --git a/checkov/kubernetes/checks/resource/k8s/RbacControlWebhooks.py b/checkov/kubernetes/checks/resource/k8s/RbacControlWebhooks.py index a6a7b4b3fa3..b83f58e7ccf 100644 --- a/checkov/kubernetes/checks/resource/k8s/RbacControlWebhooks.py +++ b/checkov/kubernetes/checks/resource/k8s/RbacControlWebhooks.py @@ -2,7 +2,7 @@ class RbacControlWebhooks(BaseRbacK8sCheck): - def __init__(self): + def __init__(self) -> None: name = "Minimize ClusterRoles that grant control over validating or mutating admission webhook configurations" id = "CKV_K8S_155" supported_entities = ["ClusterRole"] diff --git a/checkov/kubernetes/checks/resource/k8s/RbacEscalateRoles.py b/checkov/kubernetes/checks/resource/k8s/RbacEscalateRoles.py index 4d941b8b1eb..26e59af4e20 100644 --- a/checkov/kubernetes/checks/resource/k8s/RbacEscalateRoles.py +++ b/checkov/kubernetes/checks/resource/k8s/RbacEscalateRoles.py @@ -2,7 +2,7 @@ class RbacEscalateRoles(BaseRbacK8sCheck): - def __init__(self): + def __init__(self) -> None: name = "Minimize Roles and ClusterRoles that grant permissions to escalate Roles or ClusterRoles" id = "CKV_K8S_158" super().__init__(name=name, id=id) diff --git a/checkov/kubernetes/checks/resource/k8s/RootContainers.py b/checkov/kubernetes/checks/resource/k8s/RootContainers.py index 47a17d8d6a8..be3bb299a48 100644 --- a/checkov/kubernetes/checks/resource/k8s/RootContainers.py +++ b/checkov/kubernetes/checks/resource/k8s/RootContainers.py @@ -1,9 +1,13 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckResult from checkov.kubernetes.checks.resource.base_root_container_check import BaseK8sRootContainerCheck class RootContainers(BaseK8sRootContainerCheck): - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.6 # CIS-1.5 5.2.6 name = "Minimize the admission of root containers" @@ -14,12 +18,12 @@ def __init__(self): id = "CKV_K8S_23" super().__init__(name=name, id=id) - def scan_spec_conf(self, conf): + def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: spec = self.extract_spec(conf) # Collect results if spec and isinstance(spec, dict): - results = {"pod": {}, "container": []} + results: dict[str, Any] = {"pod": {}, "container": []} results["pod"]["runAsNonRoot"] = self.check_runAsNonRoot(spec) results["pod"]["runAsUser"] = self.check_runAsUser(spec, 1) diff --git a/checkov/kubernetes/checks/resource/k8s/RootContainersHighUID.py b/checkov/kubernetes/checks/resource/k8s/RootContainersHighUID.py index 8817dca1b65..c8f5aaef340 100644 --- a/checkov/kubernetes/checks/resource/k8s/RootContainersHighUID.py +++ b/checkov/kubernetes/checks/resource/k8s/RootContainersHighUID.py @@ -21,7 +21,7 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: # Collect results if spec and isinstance(spec, dict): - results = {"pod": {}, "container": []} + results: dict[str, Any] = {"pod": {}, "container": []} results["pod"]["runAsUser"] = self.check_runAsUser(spec, 10000) containers = spec.get("containers", []) diff --git a/checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py b/checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py index 16393ea2026..5b57dcc84df 100644 --- a/checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/RootContainersPSP.py @@ -1,20 +1,23 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckCategories, CheckResult from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check class RootContainersPSP(BaseK8Check): - - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.6 # CIS-1.5 5.2.6 name = "Do not admit root containers" # Location: PodSecurityPolicy.spec.runAsUser.rule id = "CKV_K8S_6" - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def scan_spec_conf(self, conf): + def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: if "runAsUser" in conf["spec"]: if "rule" in conf["spec"]["runAsUser"]: diff --git a/checkov/kubernetes/checks/resource/k8s/Seccomp.py b/checkov/kubernetes/checks/resource/k8s/Seccomp.py index 7411422931f..cf6745e1ef7 100644 --- a/checkov/kubernetes/checks/resource/k8s/Seccomp.py +++ b/checkov/kubernetes/checks/resource/k8s/Seccomp.py @@ -52,24 +52,23 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if security_profile: return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED - metadata = self.get_inner_entry(conf, "metadata") + metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata") if not metadata and "metadata" in conf: metadata = conf["metadata"] elif conf['kind'] == 'CronJob': - if "spec" in conf: - if isinstance(conf["spec"], dict) and "jobTemplate" in conf["spec"]: - if "spec" in conf["spec"]["jobTemplate"]: - if conf["spec"]["jobTemplate"]["spec"] and "template" in conf["spec"]["jobTemplate"]["spec"]: - if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]: - metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["metadata"] - elif "spec" in conf["spec"]["jobTemplate"]["spec"]["template"]: - if "metadata" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]: - metadata = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["metadata"] - elif "securityContext" in conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]: - security_profile = conf["spec"]["jobTemplate"]["spec"]["template"]["spec"]["securityContext"].get("seccompProfile", {}).get("type") - return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED + inner_template = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template") + if inner_template and isinstance(inner_template, dict): + if "metadata" in inner_template: + metadata = inner_template["metadata"] + elif "spec" in inner_template: + inner_spec = inner_template["spec"] + if "metadata" in inner_spec: + metadata = inner_spec["metadata"] + elif "securityContext" in inner_spec: + security_profile = inner_spec["securityContext"].get("seccompProfile", {}).get("type") + return CheckResult.PASSED if security_profile == 'RuntimeDefault' else CheckResult.FAILED else: - inner_metadata = self.get_inner_entry(conf, "metadata") + inner_metadata = find_in_dict(input_dict=conf, key_path="spec/template/metadata") metadata = inner_metadata if inner_metadata else metadata if metadata: diff --git a/checkov/kubernetes/checks/resource/k8s/SeccompPSP.py b/checkov/kubernetes/checks/resource/k8s/SeccompPSP.py index 2b561b24380..dded5541c85 100644 --- a/checkov/kubernetes/checks/resource/k8s/SeccompPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/SeccompPSP.py @@ -1,19 +1,22 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckCategories, CheckResult from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check class SeccompPSP(BaseK8Check): - - def __init__(self): + def __init__(self) -> None: # CIS-1.5 5.7.2 name = "Ensure default seccomp profile set to docker/default or runtime/default" id = "CKV_K8S_32" # Location: PodSecurityPolicy.annotations.seccomp.security.alpha.kubernetes.io/defaultProfileName - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def scan_spec_conf(self, conf): + def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "metadata" in conf: if "annotations" in conf["metadata"] and conf["metadata"].get("annotations"): for annotation in conf["metadata"]["annotations"]: diff --git a/checkov/kubernetes/checks/resource/k8s/ServiceAccountTokens.py b/checkov/kubernetes/checks/resource/k8s/ServiceAccountTokens.py index f9db1f348e6..f486eca9ec9 100644 --- a/checkov/kubernetes/checks/resource/k8s/ServiceAccountTokens.py +++ b/checkov/kubernetes/checks/resource/k8s/ServiceAccountTokens.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec # Collect results diff --git a/checkov/kubernetes/checks/resource/k8s/ShareHostIPC.py b/checkov/kubernetes/checks/resource/k8s/ShareHostIPC.py index c787e49761f..fd7c5e2c2cf 100644 --- a/checkov/kubernetes/checks/resource/k8s/ShareHostIPC.py +++ b/checkov/kubernetes/checks/resource/k8s/ShareHostIPC.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec if spec: if "hostIPC" in spec: diff --git a/checkov/kubernetes/checks/resource/k8s/ShareHostIPCPSP.py b/checkov/kubernetes/checks/resource/k8s/ShareHostIPCPSP.py index c9da1f87447..3e69e9e644e 100644 --- a/checkov/kubernetes/checks/resource/k8s/ShareHostIPCPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/ShareHostIPCPSP.py @@ -3,18 +3,17 @@ class ShareHostIPCPSP(BaseSpecOmittedOrValueCheck): - - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.3 # CIS-1.5 5.2.3 name = "Do not admit containers wishing to share the host IPC namespace" id = "CKV_K8S_3" # Location: PodSecurityPolicy.spec.hostIPC - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def get_inspected_key(self): + def get_inspected_key(self) -> str: return "spec/hostIPC" diff --git a/checkov/kubernetes/checks/resource/k8s/ShareHostPID.py b/checkov/kubernetes/checks/resource/k8s/ShareHostPID.py index 3aca9c15fa2..08c098657de 100644 --- a/checkov/kubernetes/checks/resource/k8s/ShareHostPID.py +++ b/checkov/kubernetes/checks/resource/k8s/ShareHostPID.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec if spec: if "hostPID" in spec: diff --git a/checkov/kubernetes/checks/resource/k8s/ShareHostPIDPSP.py b/checkov/kubernetes/checks/resource/k8s/ShareHostPIDPSP.py index f2896d09f0b..f9ddc63fbff 100644 --- a/checkov/kubernetes/checks/resource/k8s/ShareHostPIDPSP.py +++ b/checkov/kubernetes/checks/resource/k8s/ShareHostPIDPSP.py @@ -3,18 +3,17 @@ class ShareHostPIDPSP(BaseSpecOmittedOrValueCheck): - - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.2 # CIS-1.5 5.2.2 name = "Do not admit containers wishing to share the host process ID namespace" id = "CKV_K8S_1" # Location: PodSecurityPolicy.spec.hostPID - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def get_inspected_key(self): + def get_inspected_key(self) -> str: return "spec/hostPID" diff --git a/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespace.py b/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespace.py index 5fe536a0c09..f2613f924a2 100644 --- a/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespace.py +++ b/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespace.py @@ -3,6 +3,7 @@ from typing import Any from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.common.util.data_structures_utils import find_in_dict from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check @@ -35,18 +36,10 @@ def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if "spec" in conf: spec = conf["spec"] elif conf["kind"] == "CronJob": - spec = conf.get("spec") - if spec: - job_template = spec.get("jobTemplate") - if job_template: - job_template_spec = job_template.get("spec") - if job_template_spec: - template = job_template_spec.get("template") - if template: - if "spec" in template: - spec = template["spec"] + inner_spec = find_in_dict(input_dict=conf, key_path="spec/jobTemplate/spec/template/spec") + spec = inner_spec if inner_spec else spec else: - inner_spec = self.get_inner_entry(conf, "spec") + inner_spec = find_in_dict(input_dict=conf, key_path="spec/template/spec") spec = inner_spec if inner_spec else spec if spec: if "hostNetwork" in spec: diff --git a/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespacePSP.py b/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespacePSP.py index ad3c2310e4a..8223cb41cc8 100644 --- a/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespacePSP.py +++ b/checkov/kubernetes/checks/resource/k8s/SharedHostNetworkNamespacePSP.py @@ -3,18 +3,17 @@ class SharedHostNetworkNamespacePSP(BaseSpecOmittedOrValueCheck): - - def __init__(self): + def __init__(self) -> None: # CIS-1.3 1.7.4 # CIS-1.5 5.2.4 name = "Do not admit containers wishing to share the host network namespace" id = "CKV_K8S_4" # Location: PodSecurityPolicy.spec.hostNetwork - supported_kind = ['PodSecurityPolicy'] - categories = [CheckCategories.KUBERNETES] + supported_kind = ("PodSecurityPolicy",) + categories = (CheckCategories.KUBERNETES,) super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) - def get_inspected_key(self): + def get_inspected_key(self) -> str: return "spec/hostNetwork" diff --git a/checkov/kubernetes/checks/resource/k8s/WildcardRoles.py b/checkov/kubernetes/checks/resource/k8s/WildcardRoles.py index e5f2d6c7e3c..be003ea1987 100644 --- a/checkov/kubernetes/checks/resource/k8s/WildcardRoles.py +++ b/checkov/kubernetes/checks/resource/k8s/WildcardRoles.py @@ -16,7 +16,8 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_entities=supported_kind) def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: - if isinstance(conf.get("rules"), list) and len(conf.get("rules")) > 0: + rules = conf.get("rules") + if isinstance(rules, list) and len(rules) > 0: if "apiGroups" in conf["rules"][0]: if any("*" in s for s in conf["rules"][0]["apiGroups"]): return CheckResult.FAILED diff --git a/extra_stubs/docker/__init__.pyi b/extra_stubs/docker/__init__.pyi new file mode 100644 index 00000000000..af6e877d262 --- /dev/null +++ b/extra_stubs/docker/__init__.pyi @@ -0,0 +1,5 @@ +from .client import DockerClient, from_env + +__all__ = [ + "from_env", +] diff --git a/extra_stubs/docker/client.pyi b/extra_stubs/docker/client.pyi new file mode 100644 index 00000000000..5eb6dc93252 --- /dev/null +++ b/extra_stubs/docker/client.pyi @@ -0,0 +1,13 @@ +from typing import Any + +from .models.images import ImageCollection + +class DockerClient: + @classmethod + def from_env(cls, **kwargs: Any) -> DockerClient: ... + + @property + def images(self) -> ImageCollection: ... + + +from_env = DockerClient.from_env diff --git a/extra_stubs/docker/models/images.pyi b/extra_stubs/docker/models/images.pyi new file mode 100644 index 00000000000..e1b1cc51283 --- /dev/null +++ b/extra_stubs/docker/models/images.pyi @@ -0,0 +1,14 @@ +from typing import Any + + +class Image: + attrs: dict[str, Any] + @property + def id(self) -> str: ... # is actually defined in its parent class 'Model' + @property + def short_id(self) -> str: ... + + +class ImageCollection: + def get(self, name: str) -> Image: ... + def pull(self, repository: str, tag: str | None= ..., all_tags: bool = ..., **kwargs: Any) -> Image: ... diff --git a/mypy.ini b/mypy.ini index a415ff484c0..bf3e64f4bd2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,7 +2,7 @@ mypy_path = extra_stubs files = checkov -exclude = checkov/(cloudformation/checks|kubernetes/checks|serverless|terraform/checks) +exclude = checkov/(cloudformation/checks|serverless|terraform/checks) strict = True disallow_subclassing_any = False implicit_reexport = True @@ -11,9 +11,6 @@ show_error_codes = True [mypy-configargparse.*] ignore_missing_imports = True -[mypy-docker.*] -ignore_missing_imports = True - [mypy-dpath.*] ignore_missing_imports = True From 09295c84950bbd8331d4f3fc52d55efbf398dccd Mon Sep 17 00:00:00 2001 From: gruebel Date: Sun, 5 Nov 2023 12:54:04 +0100 Subject: [PATCH 2/2] fix linting --- checkov/common/images/image_referencer.py | 2 +- .../kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/checkov/common/images/image_referencer.py b/checkov/common/images/image_referencer.py index 96383184c78..92d439f7cbf 100644 --- a/checkov/common/images/image_referencer.py +++ b/checkov/common/images/image_referencer.py @@ -103,7 +103,7 @@ def inspect(image_name: str) -> str: image = client.images.get(image_name) except Exception: image = client.images.pull(image_name) - return image.short_id + return image.short_id return image.short_id except Exception: logging.debug(f"failed to pull docker image={image_name}", exc_info=True) diff --git a/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py b/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py index 347afb0c80c..eabae61c5ba 100644 --- a/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py +++ b/checkov/kubernetes/checks/resource/k8s/PeerClientCertAuthTrue.py @@ -16,7 +16,7 @@ def __init__(self) -> None: def scan_spec_conf(self, conf: dict[str, Any]) -> CheckResult: if conf.get("metadata", {}).get("name") == "etcd": - containers = conf.get("spec",{}).get("containers") + containers = conf.get("spec", {}).get("containers") if containers: for container in containers: if container.get("args") is not None: