Skip to content

Commit

Permalink
break(general): remove multi_signature and adjust base check classes (b…
Browse files Browse the repository at this point in the history
…ridgecrewio#5645)

* remove multi_signature and adjust base check classes

* fix linting and test
  • Loading branch information
gruebel authored Oct 24, 2023
1 parent 940a484 commit 0df8ed8
Show file tree
Hide file tree
Showing 52 changed files with 130 additions and 330 deletions.
4 changes: 1 addition & 3 deletions checkov/ansible/checks/base_ansible_task_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def __init__(

registry.register(self)

def scan_entity_conf( # type:ignore[override] # multi_signature decorator is problematic
self, conf: dict[str, Any], entity_type: str
) -> tuple[CheckResult, dict[str, Any]]:
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type
self.entity_conf = conf

Expand Down
2 changes: 1 addition & 1 deletion checkov/argo_workflows/checks/base_argo_workflows_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
33 changes: 15 additions & 18 deletions checkov/arm/base_parameter_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from abc import abstractmethod
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any

from checkov.arm.registry import arm_parameter_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.multi_signature import multi_signature

if TYPE_CHECKING:
from checkov.common.models.enums import CheckCategories, CheckResult
Expand All @@ -19,26 +18,24 @@ def __init__(
id: str,
categories: Iterable[CheckCategories],
supported_resources: Iterable[str],
guideline: str | None = None
guideline: str | None = None,
) -> None:
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources,
block_type="parameter", guideline=guideline)
super().__init__(
name=name,
id=id,
categories=categories,
supported_entities=supported_resources,
block_type="parameter",
guideline=guideline,
)
self.supported_resources = supported_resources
arm_parameter_registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok
return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
21 changes: 5 additions & 16 deletions checkov/arm/base_resource_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

from abc import abstractmethod
from collections.abc import Iterable
from typing import Any, Callable
from typing import Any

from checkov.arm.registry import arm_resource_registry
from checkov.bicep.checks.resource.registry import registry as bicep_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.multi_signature import multi_signature


class BaseResourceCheck(BaseCheck):
Expand All @@ -33,7 +32,7 @@ def __init__(
# leverage ARM checks to use with bicep runner
bicep_registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

# the "existing" key indicates a Bicep resource
Expand All @@ -50,22 +49,12 @@ def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResul
# this means the whole resource block is surrounded by a for loop
resource_conf = resource_conf["config"]

return self.scan_resource_conf(resource_conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
return self.scan_resource_conf(resource_conf)

self.api_version = None

return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
2 changes: 1 addition & 1 deletion checkov/arm/base_resource_negative_value_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
def _is_variable_dependant(value: Any) -> bool:
return bool(isinstance(value, str) and re.match(VARIABLE_DEPENDANT_REGEX, value))

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
inspected_key = self.get_inspected_key()
forbidden_values = self.get_forbidden_values()
value = find_in_dict(conf, inspected_key)
Expand Down
2 changes: 1 addition & 1 deletion checkov/arm/base_resource_value_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _is_variable_dependant(value: Any) -> bool:
return True
return False

def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
inspected_key = self.get_inspected_key()
expected_values = self.get_expected_values()
value = find_in_dict(conf, inspected_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.multi_signature import multi_signature
from checkov.cloudformation.checks.utils.iam_cloudformation_document_to_policy_converter import \
convert_cloudformation_conf_to_iam_policy

Expand Down Expand Up @@ -77,7 +76,6 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
return CheckResult.UNKNOWN
return CheckResult.PASSED

@multi_signature()
@abstractmethod
def cloudsplaining_analysis(self, policy: PolicyDocument) -> list[str]:
raise NotImplementedError()
17 changes: 3 additions & 14 deletions checkov/cloudformation/checks/resource/base_resource_check.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import abstractmethod
from collections.abc import Iterable
from typing import Callable, Optional, Dict, Any
from typing import Optional, Dict, Any

from checkov.cloudformation.checks.resource.registry import cfn_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.multi_signature import multi_signature


class BaseResourceCheck(BaseCheck):
Expand All @@ -31,18 +30,8 @@ def __init__(
def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

return self.scan_resource_conf(conf, entity_type)
return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: Dict[str, Any], entity_type: Optional[str] = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)
self.missing_block_result = missing_block_result

def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
excluded_key = self.get_excluded_key()
if excluded_key is not None:
path_elements = excluded_key.split("/")
Expand Down
41 changes: 9 additions & 32 deletions checkov/common/checks/base_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@

import logging
import os
from abc import abstractmethod
from abc import abstractmethod, ABC
from collections.abc import Iterable
from typing import List, Dict, Any, Callable, Optional
from typing import List, Dict, Any, Optional

from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.typing import _SkippedCheck, _CheckResult
from checkov.common.util.type_forcers import force_list
from checkov.common.models.enums import CheckResult, CheckCategories, CheckFailLevel
from checkov.common.multi_signature import MultiSignatureMeta, multi_signature


class BaseCheck(metaclass=MultiSignatureMeta):
class BaseCheck(ABC):
def __init__(
self,
name: str,
Expand Down Expand Up @@ -48,7 +47,7 @@ def __init__(
def run(
self,
scanned_file: str,
entity_configuration: Dict[str, List[Any]],
entity_configuration: Dict[str, Any],
entity_name: str,
entity_type: str,
skip_info: _SkippedCheck,
Expand All @@ -58,51 +57,29 @@ def run(
if skip_info:
check_result["result"] = CheckResult.SKIPPED
check_result["suppress_comment"] = skip_info["suppress_comment"]
message = 'File {}, {} "{}.{}" check "{}" Result: {}, Suppression comment: {} '.format(
scanned_file,
self.block_type,
entity_type,
entity_name,
self.name,
check_result,
check_result["suppress_comment"],
self.logger.debug(
f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}, Suppression comment: {check_result["suppress_comment"]}'
)
self.logger.debug(message)
else:
try:
self.evaluated_keys = []
self.entity_path = f"{scanned_file}:{entity_type}:{entity_name}"
check_result["result"] = self.scan_entity_conf(entity_configuration, entity_type)
check_result["evaluated_keys"] = self.get_evaluated_keys()
message = 'File {}, {} "{}.{}" check "{}" Result: {} '.format(
scanned_file, self.block_type, entity_type, entity_name, self.name, check_result
self.logger.debug(
f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}'
)
self.logger.debug(message)

except Exception:
self.log_check_error(scanned_file=scanned_file, entity_type=entity_type, entity_name=entity_name,
entity_configuration=entity_configuration)
raise
return check_result

@multi_signature()
@abstractmethod
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult | tuple[CheckResult, dict[str, Any]]:
raise NotImplementedError()

@classmethod
@scan_entity_conf.add_signature(args=["self", "conf"])
def _scan_entity_conf_self_conf(
cls, wrapped: Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]]
) -> Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]]:
def wrapper(
self: "BaseCheck", conf: Dict[str, Any], entity_type: Optional[str] = None
) -> CheckResult | tuple[CheckResult, dict[str, Any]]:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper

def get_evaluated_keys(self) -> List[str]:
"""
Retrieves the evaluated keys for the run's report. Child classes override the function and return the `expected_keys` instead.
Expand All @@ -114,7 +91,7 @@ def get_output_id(self, use_bc_ids: bool) -> str:
return self.bc_id if self.bc_id and use_bc_ids else self.id

def log_check_error(self, scanned_file: str, entity_type: str, entity_name: str,
entity_configuration: Dict[str, List[Any]]) -> None:
entity_configuration: Dict[str, Any]) -> None:
if self.check_fail_level == CheckFailLevel.ERROR:
logging.error(f'Failed to run check {self.id} on {scanned_file}:{entity_type}.{entity_name}',
exc_info=True)
Expand Down
Loading

0 comments on commit 0df8ed8

Please sign in to comment.