diff --git a/checkov/ansible/checks/base_ansible_task_check.py b/checkov/ansible/checks/base_ansible_task_check.py index eda5d224185..e7872aa4380 100644 --- a/checkov/ansible/checks/base_ansible_task_check.py +++ b/checkov/ansible/checks/base_ansible_task_check.py @@ -57,9 +57,7 @@ def __init__( registry.register(self) - def scan_entity_conf( # type:ignore[override] # multi_signature decorator is problematic - self, conf: dict[str, Any], entity_type: str - ) -> tuple[CheckResult, dict[str, Any]]: + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type self.entity_conf = conf diff --git a/checkov/argo_workflows/checks/base_argo_workflows_check.py b/checkov/argo_workflows/checks/base_argo_workflows_check.py index f0ea569e015..f1232ecf956 100644 --- a/checkov/argo_workflows/checks/base_argo_workflows_check.py +++ b/checkov/argo_workflows/checks/base_argo_workflows_check.py @@ -31,7 +31,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/arm/base_parameter_check.py b/checkov/arm/base_parameter_check.py index 1c70c773b59..3d17cdccd6b 100644 --- a/checkov/arm/base_parameter_check.py +++ b/checkov/arm/base_parameter_check.py @@ -2,11 +2,10 @@ from abc import abstractmethod from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any from checkov.arm.registry import arm_parameter_registry from checkov.common.checks.base_check import BaseCheck -from checkov.common.multi_signature import multi_signature if TYPE_CHECKING: from checkov.common.models.enums import CheckCategories, CheckResult @@ -19,26 +18,24 @@ def __init__( id: str, categories: Iterable[CheckCategories], supported_resources: Iterable[str], - guideline: str | None = None + guideline: str | None = None, ) -> None: - super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources, - block_type="parameter", guideline=guideline) + super().__init__( + name=name, + id=id, + categories=categories, + supported_entities=supported_resources, + block_type="parameter", + guideline=guideline, + ) self.supported_resources = supported_resources arm_parameter_registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok - return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: + self.entity_type = entity_type + + return self.scan_resource_conf(conf) - @multi_signature() @abstractmethod - def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: raise NotImplementedError() - - @classmethod - @scan_resource_conf.add_signature(args=["self", "conf"]) - def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]: - def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult: - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper diff --git a/checkov/arm/base_resource_check.py b/checkov/arm/base_resource_check.py index 2d7314e1562..a214d103db4 100644 --- a/checkov/arm/base_resource_check.py +++ b/checkov/arm/base_resource_check.py @@ -8,7 +8,6 @@ from checkov.bicep.checks.resource.registry import registry as bicep_registry from checkov.common.checks.base_check import BaseCheck from checkov.common.models.enums import CheckCategories, CheckResult -from checkov.common.multi_signature import multi_signature class BaseResourceCheck(BaseCheck): @@ -33,7 +32,7 @@ def __init__( # leverage ARM checks to use with bicep runner bicep_registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: self.entity_type = entity_type # the "existing" key indicates a Bicep resource @@ -50,22 +49,12 @@ def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResul # this means the whole resource block is surrounded by a for loop resource_conf = resource_conf["config"] - return self.scan_resource_conf(resource_conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation + return self.scan_resource_conf(resource_conf) self.api_version = None - return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation + return self.scan_resource_conf(conf) - @multi_signature() @abstractmethod - def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: raise NotImplementedError() - - @classmethod - @scan_resource_conf.add_signature(args=["self", "conf"]) - def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]: - def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult: - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper diff --git a/checkov/arm/base_resource_negative_value_check.py b/checkov/arm/base_resource_negative_value_check.py index 170bb137cd4..bbb056dbd2e 100644 --- a/checkov/arm/base_resource_negative_value_check.py +++ b/checkov/arm/base_resource_negative_value_check.py @@ -33,7 +33,7 @@ def __init__( def _is_variable_dependant(value: Any) -> bool: return bool(isinstance(value, str) and re.match(VARIABLE_DEPENDANT_REGEX, value)) - def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: inspected_key = self.get_inspected_key() forbidden_values = self.get_forbidden_values() value = find_in_dict(conf, inspected_key) diff --git a/checkov/arm/base_resource_value_check.py b/checkov/arm/base_resource_value_check.py index c611f0fef8e..d17606101dc 100644 --- a/checkov/arm/base_resource_value_check.py +++ b/checkov/arm/base_resource_value_check.py @@ -34,7 +34,7 @@ def _is_variable_dependant(value: Any) -> bool: return True return False - def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: inspected_key = self.get_inspected_key() expected_values = self.get_expected_values() value = find_in_dict(conf, inspected_key) diff --git a/checkov/azure_pipelines/checks/base_azure_pipelines_check.py b/checkov/azure_pipelines/checks/base_azure_pipelines_check.py index 714db48bc8c..da05be4bf2c 100644 --- a/checkov/azure_pipelines/checks/base_azure_pipelines_check.py +++ b/checkov/azure_pipelines/checks/base_azure_pipelines_check.py @@ -31,7 +31,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/bitbucket_pipelines/base_bitbucket_pipelines_check.py b/checkov/bitbucket_pipelines/base_bitbucket_pipelines_check.py index 92af7662dcd..e9f92ff9991 100644 --- a/checkov/bitbucket_pipelines/base_bitbucket_pipelines_check.py +++ b/checkov/bitbucket_pipelines/base_bitbucket_pipelines_check.py @@ -29,7 +29,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/circleci_pipelines/base_circleci_pipelines_check.py b/checkov/circleci_pipelines/base_circleci_pipelines_check.py index e919e62269e..5858ed529a8 100644 --- a/checkov/circleci_pipelines/base_circleci_pipelines_check.py +++ b/checkov/circleci_pipelines/base_circleci_pipelines_check.py @@ -32,7 +32,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/cloudformation/checks/resource/BaseCloudsplainingIAMCheck.py b/checkov/cloudformation/checks/resource/BaseCloudsplainingIAMCheck.py index caf323aab54..69714fd7e48 100644 --- a/checkov/cloudformation/checks/resource/BaseCloudsplainingIAMCheck.py +++ b/checkov/cloudformation/checks/resource/BaseCloudsplainingIAMCheck.py @@ -11,7 +11,6 @@ from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckResult, CheckCategories -from checkov.common.multi_signature import multi_signature from checkov.cloudformation.checks.utils.iam_cloudformation_document_to_policy_converter import \ convert_cloudformation_conf_to_iam_policy @@ -77,7 +76,6 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: return CheckResult.UNKNOWN return CheckResult.PASSED - @multi_signature() @abstractmethod def cloudsplaining_analysis(self, policy: PolicyDocument) -> list[str]: raise NotImplementedError() diff --git a/checkov/cloudformation/checks/resource/base_resource_check.py b/checkov/cloudformation/checks/resource/base_resource_check.py index c8c250dd198..eb72c5955a9 100644 --- a/checkov/cloudformation/checks/resource/base_resource_check.py +++ b/checkov/cloudformation/checks/resource/base_resource_check.py @@ -5,7 +5,6 @@ from checkov.cloudformation.checks.resource.registry import cfn_registry from checkov.common.checks.base_check import BaseCheck from checkov.common.models.enums import CheckCategories, CheckResult -from checkov.common.multi_signature import multi_signature class BaseResourceCheck(BaseCheck): @@ -31,18 +30,8 @@ def __init__( def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: self.entity_type = entity_type - return self.scan_resource_conf(conf, entity_type) + return self.scan_resource_conf(conf) - @multi_signature() @abstractmethod - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: raise NotImplementedError() - - @classmethod - @scan_resource_conf.add_signature(args=["self", "conf"]) - def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]: - def wrapper(self: BaseCheck, conf: Dict[str, Any], entity_type: Optional[str] = None) -> CheckResult: - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper diff --git a/checkov/cloudformation/checks/resource/base_resource_negative_value_check.py b/checkov/cloudformation/checks/resource/base_resource_negative_value_check.py index ca8906e4a22..2af702b7198 100644 --- a/checkov/cloudformation/checks/resource/base_resource_negative_value_check.py +++ b/checkov/cloudformation/checks/resource/base_resource_negative_value_check.py @@ -20,7 +20,7 @@ def __init__( super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) self.missing_block_result = missing_block_result - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: excluded_key = self.get_excluded_key() if excluded_key is not None: path_elements = excluded_key.split("/") diff --git a/checkov/common/checks/base_check.py b/checkov/common/checks/base_check.py index 80e9a3ec6cd..60568c5f396 100644 --- a/checkov/common/checks/base_check.py +++ b/checkov/common/checks/base_check.py @@ -2,18 +2,17 @@ import logging import os -from abc import abstractmethod +from abc import abstractmethod, ABC from collections.abc import Iterable -from typing import List, Dict, Any, Callable, Optional +from typing import List, Dict, Any, Optional from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger from checkov.common.typing import _SkippedCheck, _CheckResult from checkov.common.util.type_forcers import force_list from checkov.common.models.enums import CheckResult, CheckCategories, CheckFailLevel -from checkov.common.multi_signature import MultiSignatureMeta, multi_signature -class BaseCheck(metaclass=MultiSignatureMeta): +class BaseCheck(ABC): def __init__( self, name: str, @@ -48,7 +47,7 @@ def __init__( def run( self, scanned_file: str, - entity_configuration: Dict[str, List[Any]], + entity_configuration: Dict[str, Any], entity_name: str, entity_type: str, skip_info: _SkippedCheck, @@ -58,26 +57,18 @@ def run( if skip_info: check_result["result"] = CheckResult.SKIPPED check_result["suppress_comment"] = skip_info["suppress_comment"] - message = 'File {}, {} "{}.{}" check "{}" Result: {}, Suppression comment: {} '.format( - scanned_file, - self.block_type, - entity_type, - entity_name, - self.name, - check_result, - check_result["suppress_comment"], + self.logger.debug( + f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}, Suppression comment: {check_result["suppress_comment"]}' ) - self.logger.debug(message) else: try: self.evaluated_keys = [] self.entity_path = f"{scanned_file}:{entity_type}:{entity_name}" check_result["result"] = self.scan_entity_conf(entity_configuration, entity_type) check_result["evaluated_keys"] = self.get_evaluated_keys() - message = 'File {}, {} "{}.{}" check "{}" Result: {} '.format( - scanned_file, self.block_type, entity_type, entity_name, self.name, check_result + self.logger.debug( + f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}' ) - self.logger.debug(message) except Exception: self.log_check_error(scanned_file=scanned_file, entity_type=entity_type, entity_name=entity_name, @@ -85,24 +76,10 @@ def run( raise return check_result - @multi_signature() @abstractmethod def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult | tuple[CheckResult, dict[str, Any]]: raise NotImplementedError() - @classmethod - @scan_entity_conf.add_signature(args=["self", "conf"]) - def _scan_entity_conf_self_conf( - cls, wrapped: Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]] - ) -> Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]]: - def wrapper( - self: "BaseCheck", conf: Dict[str, Any], entity_type: Optional[str] = None - ) -> CheckResult | tuple[CheckResult, dict[str, Any]]: - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper - def get_evaluated_keys(self) -> List[str]: """ Retrieves the evaluated keys for the run's report. Child classes override the function and return the `expected_keys` instead. @@ -114,7 +91,7 @@ def get_output_id(self, use_bc_ids: bool) -> str: return self.bc_id if self.bc_id and use_bc_ids else self.id def log_check_error(self, scanned_file: str, entity_type: str, entity_name: str, - entity_configuration: Dict[str, List[Any]]) -> None: + entity_configuration: Dict[str, Any]) -> None: if self.check_fail_level == CheckFailLevel.ERROR: logging.error(f'Failed to run check {self.id} on {scanned_file}:{entity_type}.{entity_name}', exc_info=True) diff --git a/checkov/common/multi_signature.py b/checkov/common/multi_signature.py deleted file mode 100644 index 07d2740a9fc..00000000000 --- a/checkov/common/multi_signature.py +++ /dev/null @@ -1,128 +0,0 @@ -from __future__ import annotations - -import inspect -from abc import ABCMeta -from functools import update_wrapper -from types import CodeType -from typing import Callable, Any, TypeVar, cast, Protocol - -_MultiT = TypeVar("_MultiT") - - -class _MultiSignataureMethod(Protocol): - __code__: CodeType - __multi_signature_wrappers__: dict[tuple[tuple[str, ...], Any, Any], Callable[..., Any]] - - def __call__(self, *args: Any, **kwargs: Any) -> Any: - ... - - def add_signature(self, *, args: list[str], varargs: Any = None, varkw: Any = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - ... - - -class MultiSignatureMeta(ABCMeta): # noqa: B024 # needs to be ABCMeta, because of the super().__new__ call - __multi_signature_methods__: dict[str, _MultiSignataureMethod] # noqa: CCE003 - - def __new__(cls, name: str, bases: tuple[Any], namespace: dict[str, Any], **kwargs: Any) -> MultiSignatureMeta: - mcs = super().__new__(cls, name, bases, namespace, **kwargs) - multi_signatures: dict[str, _MultiSignataureMethod] = { - name: value # type:ignore[misc] - for name, value in namespace.items() - if hasattr(value, "__multi_signature_wrappers__") and inspect.isfunction(value) - # isfunction, because function is not bound yet - } - # search in base classes for decorated functions - for base in bases: - for name, value in getattr(base, "__multi_signature_methods__", {}).items(): - if inspect.isfunction(value) and hasattr(value, "__multi_signature_wrappers__"): - multi_signature_wrappers = getattr(value, "__multi_signature_wrappers__", {}) - if multi_signature_wrappers: - current_function = multi_signatures.get(name) - if current_function: - current_function.__multi_signature_wrappers__.update(multi_signature_wrappers) - else: - multi_signatures[name] = cast(_MultiSignataureMethod, value) - - mcs.__multi_signature_methods__ = multi_signatures - for name, value in multi_signatures.items(): - wrapped = getattr(mcs, name) - arguments = inspect.getargs(wrapped.__code__) - if arguments == inspect.getargs(value.__code__): - # Do not replace if the signature is the same - continue - # convert args into a tuple - args, varargs, varkw = arguments - multi_signature_key = tuple(args), varargs, varkw - get_wrapper = value.__multi_signature_wrappers__.get(tuple(multi_signature_key), None) - if get_wrapper: - wrapper = get_wrapper(mcs, wrapped) - update_wrapper(wrapper, wrapped) - setattr(mcs, name, wrapper) - else: - # unknown implementation - raise NotImplementedError(f"The signature {multi_signature_key} for {name} is not supported.") - - return mcs - - -class multi_signature: - """ - Decorator to allow other signatures in sub classes. - - You can use this decorator on class methods. The implementation requires that the metaclass of that class is - :class:`MultiSignatureMeta` - This class extends :class:`ABCMeta` so it supports abstract base classes. - """ - - def __init__(self) -> None: - self.__wrappers__: dict[tuple[tuple[str, ...], Any, Any], Callable[..., _MultiT]] = {} - - def __call__(self, fn: Callable[..., _MultiT]) -> _MultiSignataureMethod: - fn.add_signature = self.add_signature # type:ignore[attr-defined] - fn.__multi_signature_wrappers__ = self.__wrappers__ # type:ignore[attr-defined] - return cast(_MultiSignataureMethod, fn) - - def add_signature( - self, *, args: list[str], varargs: Any = None, varkw: Any = None - ) -> Callable[[Callable[..., _MultiT]], Callable[..., _MultiT]]: - """ - Registers a new wrapper for the decorated function. - - The decorated function must have two parameters (`cls` and `wrapped`) (The names are not enforced.). - The first one is the class so this has to be a class function. The second one is the function to wrap. - It has to return a function that has the same signature as the one decorated with `@multi_signature`. - The returned function has to call `wrapped`, which is a function with the parameters listed as `args` - and must return the result of `wrapped`. If the wrapped function has varargs, the name of this variable - is specified with `varargs`. For keyword args it is `varkw`. See :func:`inspect.getargs` to get the - correct order. - - The implementation must not handle doc preservation. This is already done by the implementation. - - :param args: the parameters that the wrapped version of the function accepts. - :param varargs: if the wrapped version uses varargs, this is the name of this - parameter. If not used, this is `None`. - :param varkw: if the wrapped version uses keyword arguments, this is the name - of this parameter. If not used, this is `None`. - - :Example: - - >>> class Example(metaclass=MultiSignatureMeta: - >>> - >>> @multi_signature - >>> def some_function(self, a, b, c): - >>> pass - >>> - >>> @classmethod - >>> @some_function.add_signature("self", "a") - >>> def _some_function_self_a(cls, wrapped): - >>> def wrapper(self, a, b, c): - >>> return wrapped(self, a) - >>> - >>> return wrapper - """ - - def wrapper(fn: Callable[..., _MultiT]) -> Callable[..., _MultiT]: - self.__wrappers__[(tuple(args), varargs, varkw)] = fn # type:ignore[assignment] # mypy bug - return fn - - return wrapper diff --git a/checkov/example_runner/checks/job/ExampleCheckTrueFalse.py b/checkov/example_runner/checks/job/ExampleCheckTrueFalse.py index 6a82bc2938b..e82f8b97a11 100644 --- a/checkov/example_runner/checks/job/ExampleCheckTrueFalse.py +++ b/checkov/example_runner/checks/job/ExampleCheckTrueFalse.py @@ -51,7 +51,7 @@ def __init__(self) -> None: block_type=BlockType.ARRAY, ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # return type is different than the base class + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # The block type is passed as a data structure. # Add logic to parse the structure for the misconfig # Remember to always return a PASSED or FAILED. diff --git a/checkov/github/base_github_branch_security.py b/checkov/github/base_github_branch_security.py index 1ab9924519e..ee39466d667 100644 --- a/checkov/github/base_github_branch_security.py +++ b/checkov/github/base_github_branch_security.py @@ -24,8 +24,7 @@ def __init__(self, id: str, name: str) -> None: block_type=BlockType.DOCUMENT, ) - def scan_entity_conf( # type:ignore[override] - self, conf: dict[str, Any], entity_type: str) -> CheckResult: + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: if branch_security_schema.validate(conf): evaluated_key = self.get_evaluated_keys()[0] jsonpath_expression = get_jsonpath_from_evaluated_key(evaluated_key) diff --git a/checkov/github/base_github_negative_branch_security.py b/checkov/github/base_github_negative_branch_security.py index b250dc07ff6..cfcff356513 100644 --- a/checkov/github/base_github_negative_branch_security.py +++ b/checkov/github/base_github_negative_branch_security.py @@ -27,7 +27,7 @@ def __init__(self, id: str, name: str, missing_attribute_result: CheckResult = C ) self.missing_attribute_result = missing_attribute_result - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: if branch_security_schema.validate(conf): evaluated_key = self.get_evaluated_keys()[0].replace("/", ".") jsonpath_expression = parse(f"$..{evaluated_key}") diff --git a/checkov/github/base_github_org_check.py b/checkov/github/base_github_org_check.py index d632043e642..43b3ced666c 100644 --- a/checkov/github/base_github_org_check.py +++ b/checkov/github/base_github_org_check.py @@ -22,7 +22,7 @@ def __init__(self, id: str, name: str, missing_attribute_result: CheckResult = C ) self.missing_attribute_result = missing_attribute_result - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: ckv_metadata, conf = self.resolve_ckv_metadata_conf(conf=conf) if 'org_metadata' in ckv_metadata.get('file_name', ''): if org_schema.validate(conf): diff --git a/checkov/github/base_github_org_security.py b/checkov/github/base_github_org_security.py index 3210f377b90..0557f677e35 100644 --- a/checkov/github/base_github_org_security.py +++ b/checkov/github/base_github_org_security.py @@ -22,7 +22,7 @@ def __init__(self, id: str, name: str) -> None: block_type=BlockType.DOCUMENT ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: if org_security_schema.validate(conf): evaluated_key = self.get_evaluated_keys()[0] jsonpath_expression = get_jsonpath_from_evaluated_key(evaluated_key) diff --git a/checkov/github/checks/disallow_inactive_branch_60days.py b/checkov/github/checks/disallow_inactive_branch_60days.py index 2fdc0f5a8d1..0a8ae324067 100644 --- a/checkov/github/checks/disallow_inactive_branch_60days.py +++ b/checkov/github/checks/disallow_inactive_branch_60days.py @@ -24,7 +24,7 @@ def __init__(self) -> None: block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: if branch_schema.validate(conf): evaluated_key = self.get_evaluated_keys()[0].replace("/", ".") jsonpath_expression = parse(f"$..{evaluated_key}") diff --git a/checkov/github/checks/minimum_admins_in_org.py b/checkov/github/checks/minimum_admins_in_org.py index fa02ac00871..cf629abd130 100644 --- a/checkov/github/checks/minimum_admins_in_org.py +++ b/checkov/github/checks/minimum_admins_in_org.py @@ -24,7 +24,7 @@ def __init__(self) -> None: block_type=BlockType.DOCUMENT ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: ckv_metadata, conf = self.resolve_ckv_metadata_conf(conf=conf) if 'org_admins' in ckv_metadata.get('file_name', ''): if org_members.validate(conf): diff --git a/checkov/github/checks/repository_collaborators.py b/checkov/github/checks/repository_collaborators.py index 59fdee77c57..3dd759cc859 100644 --- a/checkov/github/checks/repository_collaborators.py +++ b/checkov/github/checks/repository_collaborators.py @@ -21,7 +21,7 @@ def __init__(self) -> None: block_type=BlockType.DOCUMENT ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: ckv_metadata, conf = self.resolve_ckv_metadata_conf(conf=conf) if 'repository_collaborators' in ckv_metadata.get('file_name', ''): if conf and repository_collaborators_schema.validate(conf): diff --git a/checkov/github/checks/sso.py b/checkov/github/checks/sso.py index 0403b7559d1..baf5fe73b46 100644 --- a/checkov/github/checks/sso.py +++ b/checkov/github/checks/sso.py @@ -23,7 +23,7 @@ def __init__(self) -> None: block_type=BlockType.DOCUMENT ) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: if org_security_schema.validate(conf): jsonpath_expression = parse("$..{}".format(self.get_evaluated_keys()[0].replace("/", "."))) if len(jsonpath_expression.find(conf)) > 0: diff --git a/checkov/github_actions/checks/base_github_action_check.py b/checkov/github_actions/checks/base_github_action_check.py index d633a67bdf7..a2946792ee1 100644 --- a/checkov/github_actions/checks/base_github_action_check.py +++ b/checkov/github_actions/checks/base_github_action_check.py @@ -28,7 +28,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/gitlab_ci/checks/base_gitlab_ci_check.py b/checkov/gitlab_ci/checks/base_gitlab_ci_check.py index 873e4551312..0fee3a8d289 100644 --- a/checkov/gitlab_ci/checks/base_gitlab_ci_check.py +++ b/checkov/gitlab_ci/checks/base_gitlab_ci_check.py @@ -25,7 +25,7 @@ def __init__( self.path = path registry.register(self) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.entity_type = entity_type return self.scan_conf(conf) diff --git a/checkov/kubernetes/checks/resource/base_root_container_check.py b/checkov/kubernetes/checks/resource/base_root_container_check.py index 40ff1c0a625..4918c8822c2 100644 --- a/checkov/kubernetes/checks/resource/base_root_container_check.py +++ b/checkov/kubernetes/checks/resource/base_root_container_check.py @@ -2,7 +2,6 @@ from typing import Dict, Any, Optional from checkov.common.models.enums import CheckCategories, CheckResult -from checkov.common.multi_signature import multi_signature from checkov.kubernetes.checks.resource.base_spec_check import BaseK8Check from checkov.kubernetes.checks.resource.registry import registry @@ -22,13 +21,12 @@ def __init__( guideline=guideline) registry.register(self) - @multi_signature() @abstractmethod - def scan_spec_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_spec_conf(self, conf: Dict[str, Any]) -> CheckResult: """Return result of Kubernetes rooot container check.""" raise NotImplementedError() - def extract_spec(self, conf: Dict[str, Any]) -> Dict: + def extract_spec(self, conf: Dict[str, Any]) -> Dict[str, Any]: spec = {} if conf['kind'] == 'Pod': diff --git a/checkov/kubernetes/checks/resource/base_spec_check.py b/checkov/kubernetes/checks/resource/base_spec_check.py index 511df566e64..45d2b978538 100644 --- a/checkov/kubernetes/checks/resource/base_spec_check.py +++ b/checkov/kubernetes/checks/resource/base_spec_check.py @@ -4,7 +4,6 @@ from checkov.common.checks.base_check import BaseCheck from checkov.common.models.enums import CheckCategories, CheckResult -from checkov.common.multi_signature import multi_signature from checkov.kubernetes.checks.resource.registry import registry @@ -30,23 +29,13 @@ def __init__( def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: self.entity_type = entity_type - return self.scan_spec_conf(conf, entity_type) + return self.scan_spec_conf(conf) - @multi_signature() @abstractmethod - def scan_spec_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_spec_conf(self, conf: Dict[str, Any]) -> CheckResult: """Return result of Kubernetes object check.""" raise NotImplementedError() - @classmethod - @scan_spec_conf.add_signature(args=["self", "conf"]) - def _scan_spec_conf_self_conf(cls, wrapped): - def wrapper(self, conf, entity_type=None): - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper - @staticmethod def get_inner_entry(conf: Dict[str, Any], entry_name: str) -> Dict[str, Any]: spec = {} diff --git a/checkov/openapi/checks/resource/generic/ClearTextAPIKey.py b/checkov/openapi/checks/resource/generic/ClearTextAPIKey.py index f033c00c398..0b18ae5d025 100644 --- a/checkov/openapi/checks/resource/generic/ClearTextAPIKey.py +++ b/checkov/openapi/checks/resource/generic/ClearTextAPIKey.py @@ -16,7 +16,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources, block_type=BlockType.DOCUMENT) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # return type is different than the base class + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: schemes = conf.get("schemes") if schemes and isinstance(schemes, list): if "http" not in schemes and "ws" not in schemes: diff --git a/checkov/openapi/checks/resource/generic/GlobalSecurityFieldIsEmpty.py b/checkov/openapi/checks/resource/generic/GlobalSecurityFieldIsEmpty.py index 942b8835d73..ac46e2dd085 100644 --- a/checkov/openapi/checks/resource/generic/GlobalSecurityFieldIsEmpty.py +++ b/checkov/openapi/checks/resource/generic/GlobalSecurityFieldIsEmpty.py @@ -15,7 +15,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources, block_type=BlockType.DOCUMENT) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # return type is different than the base class + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: security_rules = conf.get("security") if security_rules: diff --git a/checkov/openapi/checks/resource/generic/NoMaximumNumberItems.py b/checkov/openapi/checks/resource/generic/NoMaximumNumberItems.py index e952d459d72..cd7b8ffac60 100644 --- a/checkov/openapi/checks/resource/generic/NoMaximumNumberItems.py +++ b/checkov/openapi/checks/resource/generic/NoMaximumNumberItems.py @@ -15,7 +15,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources, block_type=BlockType.DOCUMENT) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # return type is different than the base class + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: result = self.check_array_max_items(inner_conf=conf) if result: return result diff --git a/checkov/openapi/checks/resource/generic/SecurityOperations.py b/checkov/openapi/checks/resource/generic/SecurityOperations.py index dfa17a1d8d3..dd08bc3f4ee 100644 --- a/checkov/openapi/checks/resource/generic/SecurityOperations.py +++ b/checkov/openapi/checks/resource/generic/SecurityOperations.py @@ -15,7 +15,7 @@ def __init__(self) -> None: super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources, block_type=BlockType.DOCUMENT) - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # return type is different than the base class + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: self.evaluated_keys = ['security', 'paths'] # Check if security field is present and not empty at the root level diff --git a/checkov/openapi/checks/resource/v2/BaseOpenapiCheckV2.py b/checkov/openapi/checks/resource/v2/BaseOpenapiCheckV2.py index 45fe56102fd..40d57e38fc1 100644 --- a/checkov/openapi/checks/resource/v2/BaseOpenapiCheckV2.py +++ b/checkov/openapi/checks/resource/v2/BaseOpenapiCheckV2.py @@ -21,7 +21,7 @@ def __init__(self, name: str, id: str, categories: Iterable[CheckCategories], su def scan_openapi_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: raise NotImplementedError() - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: if "swagger" in conf: swagger = conf.get("swagger") if isinstance(swagger, str) and swagger == '2.0': diff --git a/checkov/openapi/checks/resource/v3/BaseOpenapiCheckV3.py b/checkov/openapi/checks/resource/v3/BaseOpenapiCheckV3.py index 1fd5f40cb9c..c42562bcd13 100644 --- a/checkov/openapi/checks/resource/v3/BaseOpenapiCheckV3.py +++ b/checkov/openapi/checks/resource/v3/BaseOpenapiCheckV3.py @@ -21,7 +21,7 @@ def __init__(self, name: str, id: str, categories: Iterable[CheckCategories], su def scan_openapi_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: raise NotImplementedError() - def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: if "openapi" in conf: openapi = conf.get("openapi") if isinstance(openapi, str) and openapi.startswith("3."): diff --git a/checkov/serverless/checks/function/base_function_check.py b/checkov/serverless/checks/function/base_function_check.py index 0ff8307d603..4fe8f3ae8ae 100644 --- a/checkov/serverless/checks/function/base_function_check.py +++ b/checkov/serverless/checks/function/base_function_check.py @@ -1,29 +1,39 @@ +from __future__ import annotations + from abc import abstractmethod +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any from checkov.common.checks.base_check import BaseCheck -from checkov.common.multi_signature import multi_signature from checkov.serverless.checks.function.registry import function_registry +if TYPE_CHECKING: + from checkov.common.models.enums import CheckCategories, CheckResult + class BaseFunctionCheck(BaseCheck): - def __init__(self, name, id, categories, supported_entities, guideline=None): - super().__init__(name=name, id=id, categories=categories, supported_entities=supported_entities, - block_type="serverless", guideline=guideline) + def __init__( + self, + name: str, + id: str, + categories: Iterable[CheckCategories], + supported_entities: Iterable[str], + guideline: str | None = None, + ) -> None: + super().__init__( + name=name, + id=id, + categories=categories, + supported_entities=supported_entities, + block_type="serverless", + guideline=guideline, + ) function_registry.register(self) - def scan_entity_conf(self, conf, entity_type): - return self.scan_function_conf(conf, entity_type) + def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: + self.entity_type = entity_type + return self.scan_function_conf(conf) - @multi_signature() @abstractmethod - def scan_function_conf(self, conf, entity_type): + def scan_function_conf(self, conf: dict[str, Any]) -> CheckResult: raise NotImplementedError() - - @classmethod - @scan_function_conf.add_signature(args=["self", "conf"]) - def _scan_function_conf_self_conf(cls, wrapped): - def wrapper(self, conf, entity_type=None): - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper diff --git a/checkov/terraform/checks/data/aws/GithubActionsOIDCTrustPolicy.py b/checkov/terraform/checks/data/aws/GithubActionsOIDCTrustPolicy.py index e43a3ad6bdc..f42063ec152 100644 --- a/checkov/terraform/checks/data/aws/GithubActionsOIDCTrustPolicy.py +++ b/checkov/terraform/checks/data/aws/GithubActionsOIDCTrustPolicy.py @@ -15,7 +15,7 @@ def __init__(self): categories = [CheckCategories.IAM] super().__init__(name=name, id=id, categories=categories, supported_data=supported_data) - def scan_data_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: + def scan_data_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: statements = force_list(conf.get('statement')) for statement in statements: found_federated_gh_oidc = False diff --git a/checkov/terraform/checks/data/aws/IAMPublicActionsPolicy.py b/checkov/terraform/checks/data/aws/IAMPublicActionsPolicy.py index c243d92cb79..b9ab6923f7a 100644 --- a/checkov/terraform/checks/data/aws/IAMPublicActionsPolicy.py +++ b/checkov/terraform/checks/data/aws/IAMPublicActionsPolicy.py @@ -13,7 +13,7 @@ def __init__(self): categories = [CheckCategories.IAM] super().__init__(name=name, id=id, categories=categories, supported_data=supported_data) - def scan_data_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: + def scan_data_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: statements = force_list(conf.get('statement')) for statement in statements: if isinstance(statement, dict): diff --git a/checkov/terraform/checks/data/base_check.py b/checkov/terraform/checks/data/base_check.py index 79f18b39ae6..afd87c5a5fd 100644 --- a/checkov/terraform/checks/data/base_check.py +++ b/checkov/terraform/checks/data/base_check.py @@ -6,7 +6,6 @@ from checkov.common.checks.base_check import BaseCheck from checkov.common.models.enums import CheckResult, CheckCategories -from checkov.common.multi_signature import multi_signature from checkov.terraform.checks.data.registry import data_registry @@ -25,23 +24,13 @@ def __init__( data_registry.register(self) def scan_entity_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: + self.entity_type = entity_type + if conf.get("count") == [0]: return CheckResult.UNKNOWN - return self.scan_data_conf(conf, entity_type) + return self.scan_data_conf(conf) - @multi_signature() @abstractmethod - def scan_data_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: + def scan_data_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: raise NotImplementedError() - - @classmethod - @scan_data_conf.add_signature(args=["self", "conf"]) - def _scan_data_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]: - def wrapper( - self: "BaseDataCheck", conf: Dict[str, List[Any]], entity_type: Optional[str] = None - ) -> CheckResult: - # keep default argument for entity_type so old code, that doesn't set it, will work. - return wrapped(self, conf) - - return wrapper diff --git a/tests/arm/runner/test_runner.py b/tests/arm/runner/test_runner.py index 13134863c77..754d80c6871 100644 --- a/tests/arm/runner/test_runner.py +++ b/tests/arm/runner/test_runner.py @@ -166,7 +166,7 @@ def __init__(self, *_, **__) -> None: ["Microsoft.Web/sites"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -195,7 +195,7 @@ def __init__(self, *_, **__) -> None: ["Microsoft.Web/sites"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -227,7 +227,7 @@ def __init__(self, *_, **__) -> None: ["Microsoft.Web/sites"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -258,7 +258,7 @@ def __init__(self, *_, **__) -> None: ["Microsoft.Web/sites"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -289,7 +289,7 @@ def __init__(self, *_, **__) -> None: ["Microsoft.Web/sites"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() diff --git a/tests/cloudformation/runner/test_runner.py b/tests/cloudformation/runner/test_runner.py index cfbd7ed39c4..a4e4e7f2248 100644 --- a/tests/cloudformation/runner/test_runner.py +++ b/tests/cloudformation/runner/test_runner.py @@ -143,7 +143,7 @@ def __init__(self, *_, **__) -> None: guideline=custom_guideline_url ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED AnyFailingCheck() @@ -468,7 +468,7 @@ def __init__(self, *_, **__) -> None: ["AWS::SQS::Queue"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -527,7 +527,7 @@ def __init__(self, *_, **__) -> None: ["AWS::SQS::Queue"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -537,7 +537,7 @@ def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckRes checks_allowlist = ['MEDIUM'] scan_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resources", "fail.yaml") report = runner.run(None, files=[scan_file_path], external_checks_dir=None, - runner_filter=RunnerFilter(framework='cloudformation', checks=checks_allowlist)) + runner_filter=RunnerFilter(framework=['cloudformation'], checks=checks_allowlist)) all_checks = report.failed_checks + report.passed_checks self.assertTrue(any(c.check_id == custom_check_id for c in all_checks)) @@ -557,7 +557,7 @@ def __init__(self, *_, **__) -> None: ["AWS::SQS::Queue"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -567,7 +567,7 @@ def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckRes checks_denylist = ['MEDIUM'] scan_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resources", "fail.yaml") report = runner.run(None, files=[scan_file_path], external_checks_dir=None, - runner_filter=RunnerFilter(framework='cloudformation', skip_checks=checks_denylist)) + runner_filter=RunnerFilter(framework=['cloudformation'], skip_checks=checks_denylist)) all_checks = report.failed_checks + report.passed_checks self.assertFalse(any(c.check_id == custom_check_id for c in all_checks)) @@ -587,7 +587,7 @@ def __init__(self, *_, **__) -> None: ["AWS::SQS::Queue"] ) - def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -597,7 +597,7 @@ def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckRes checks_denylist = ['MEDIUM'] scan_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resources", "fail.yaml") report = runner.run(None, files=[scan_file_path], external_checks_dir=None, - runner_filter=RunnerFilter(framework='cloudformation', skip_checks=checks_denylist)) + runner_filter=RunnerFilter(framework=['cloudformation'], skip_checks=checks_denylist)) all_checks = report.failed_checks + report.passed_checks self.assertTrue(any(c.check_id == custom_check_id for c in all_checks)) diff --git a/tests/common/checks/test_base_check.py b/tests/common/checks/test_base_check.py index e3d5ad1efb3..f9ea842ebea 100644 --- a/tests/common/checks/test_base_check.py +++ b/tests/common/checks/test_base_check.py @@ -23,7 +23,7 @@ def __init__(self): block_type=block_type) # noinspection PyMethodOverriding - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): """ My documentation :param conf: @@ -47,7 +47,7 @@ def __init__(self, fail_check=False): block_type=block_type) # noinspection PyMethodOverriding - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): """ My documentation :param conf: @@ -81,7 +81,7 @@ def test_entity_type_is_not_required_in_signature(self): """) def test_invalid_signature_is_detected(self): - with self.assertRaises(NotImplementedError) as context: + with self.assertRaises(TypeError) as context: class TestCheckUnknownSignature(BaseCheck): def __init__(self): @@ -93,13 +93,11 @@ def __init__(self): super().__init__(name=name, id=id, categories=categories, supported_entities=supported_entities, block_type=block_type) - # noinspection PyMethodOverriding - def scan_entity_conf(self, conf, some_unexpected_parameter_123): - return CheckResult.PASSED - self.assertIsInstance(context.exception, NotImplementedError) + TestCheckUnknownSignature() + + self.assertIsInstance(context.exception, TypeError) self.assertEqual( - "The signature ((\'self\', \'conf\', \'some_unexpected_parameter_123\'), None, None) for scan_entity_conf " - "is not supported.", + "Can't instantiate abstract class TestCheckUnknownSignature with abstract methods scan_entity_conf", context.exception.args[0] ) diff --git a/tests/generic_json/checks/array/BarAndBazToggleIsTrue.py b/tests/generic_json/checks/array/BarAndBazToggleIsTrue.py index f55ff11cc8e..41b1a6fc869 100644 --- a/tests/generic_json/checks/array/BarAndBazToggleIsTrue.py +++ b/tests/generic_json/checks/array/BarAndBazToggleIsTrue.py @@ -16,7 +16,7 @@ def __init__(self): block_type=BlockType.ARRAY ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "toggle" in conf and conf["toggle"]: return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/generic_json/checks/complex/ValueIsAtLeastTwo.py b/tests/generic_json/checks/complex/ValueIsAtLeastTwo.py index c79370b560a..5a8f8b03e98 100644 --- a/tests/generic_json/checks/complex/ValueIsAtLeastTwo.py +++ b/tests/generic_json/checks/complex/ValueIsAtLeastTwo.py @@ -17,7 +17,7 @@ def __init__(self): path="array_of_objects" ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): for obj in conf: if obj["value"] < 2: return CheckResult.FAILED, obj diff --git a/tests/generic_json/checks/object/FooExists.py b/tests/generic_json/checks/object/FooExists.py index b3104658bb8..97d8b1c34cc 100644 --- a/tests/generic_json/checks/object/FooExists.py +++ b/tests/generic_json/checks/object/FooExists.py @@ -16,7 +16,7 @@ def __init__(self): block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "foo" in conf: return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/generic_json/checks/object/PropHasValue.py b/tests/generic_json/checks/object/PropHasValue.py index 914362f0e8a..a3a4a031358 100644 --- a/tests/generic_json/checks/object/PropHasValue.py +++ b/tests/generic_json/checks/object/PropHasValue.py @@ -18,7 +18,7 @@ def __init__(self): block_type=BlockType.OBJECT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "prop" in conf and conf["prop"] == "value": return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/generic_json/checks/result_config/FullEvaluatedKey.py b/tests/generic_json/checks/result_config/FullEvaluatedKey.py index eb0aface769..2b3a69db48f 100644 --- a/tests/generic_json/checks/result_config/FullEvaluatedKey.py +++ b/tests/generic_json/checks/result_config/FullEvaluatedKey.py @@ -18,7 +18,7 @@ def __init__(self): block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): return CheckResult.PASSED def get_evaluated_keys(self) -> List[str]: diff --git a/tests/generic_json/checks/result_config/NoEvaluatedKey.py b/tests/generic_json/checks/result_config/NoEvaluatedKey.py index 79ef10ba5ae..a6208547b46 100644 --- a/tests/generic_json/checks/result_config/NoEvaluatedKey.py +++ b/tests/generic_json/checks/result_config/NoEvaluatedKey.py @@ -16,7 +16,7 @@ def __init__(self): block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): return CheckResult.PASSED diff --git a/tests/generic_json/checks/result_config/PartialEvaluatedKey.py b/tests/generic_json/checks/result_config/PartialEvaluatedKey.py index 9944fea70e3..70910c04c3d 100644 --- a/tests/generic_json/checks/result_config/PartialEvaluatedKey.py +++ b/tests/generic_json/checks/result_config/PartialEvaluatedKey.py @@ -19,7 +19,7 @@ def __init__(self): block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): return CheckResult.PASSED def get_evaluated_keys(self) -> List[str]: diff --git a/tests/generic_yaml/checks/array/BarAndBazToggleIsTrue.py b/tests/generic_yaml/checks/array/BarAndBazToggleIsTrue.py index 6b2a5eb80c3..1fdd060463f 100644 --- a/tests/generic_yaml/checks/array/BarAndBazToggleIsTrue.py +++ b/tests/generic_yaml/checks/array/BarAndBazToggleIsTrue.py @@ -16,7 +16,7 @@ def __init__(self): block_type=BlockType.ARRAY ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "toggle" in conf and conf["toggle"]: return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/generic_yaml/checks/complex/ValueIsAtLeastTwo.py b/tests/generic_yaml/checks/complex/ValueIsAtLeastTwo.py index 3a96ff24fec..4b879a56529 100644 --- a/tests/generic_yaml/checks/complex/ValueIsAtLeastTwo.py +++ b/tests/generic_yaml/checks/complex/ValueIsAtLeastTwo.py @@ -17,7 +17,7 @@ def __init__(self): path="array_of_objects" ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): for obj in conf: if obj["value"] < 2: return CheckResult.FAILED, obj diff --git a/tests/generic_yaml/checks/object/FooExists.py b/tests/generic_yaml/checks/object/FooExists.py index 37c5f54b97f..a7f36e929bc 100644 --- a/tests/generic_yaml/checks/object/FooExists.py +++ b/tests/generic_yaml/checks/object/FooExists.py @@ -16,7 +16,7 @@ def __init__(self): block_type=BlockType.DOCUMENT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "foo" in conf: return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/generic_yaml/checks/object/PropHasValue.py b/tests/generic_yaml/checks/object/PropHasValue.py index 83ba816d738..53866fd1472 100644 --- a/tests/generic_yaml/checks/object/PropHasValue.py +++ b/tests/generic_yaml/checks/object/PropHasValue.py @@ -18,7 +18,7 @@ def __init__(self): block_type=BlockType.OBJECT, ) - def scan_entity_conf(self, conf): + def scan_entity_conf(self, conf, entity_type): if "prop" in conf and conf["prop"] == "value": return CheckResult.PASSED return CheckResult.FAILED diff --git a/tests/serverless/runner/test_runner.py b/tests/serverless/runner/test_runner.py index 84bb9bd1118..be16c693941 100644 --- a/tests/serverless/runner/test_runner.py +++ b/tests/serverless/runner/test_runner.py @@ -186,7 +186,7 @@ def __init__(self, *_, **__) -> None: ["serverless_aws"] ) - def scan_function_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_function_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -215,7 +215,7 @@ def __init__(self, *_, **__) -> None: ["serverless_aws"] ) - def scan_function_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_function_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -245,7 +245,7 @@ def __init__(self, *_, **__) -> None: ["serverless_aws"] ) - def scan_function_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_function_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck() @@ -303,7 +303,7 @@ def __init__(self, *_, **__) -> None: ["serverless_aws"] ) - def scan_function_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult: + def scan_function_conf(self, conf: Dict[str, Any]) -> CheckResult: return CheckResult.FAILED check = AnyFailingCheck()