Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

break(general): remove multi_signature and adjust base check classes #5645

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions checkov/ansible/checks/base_ansible_task_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def __init__(

registry.register(self)

def scan_entity_conf( # type:ignore[override] # multi_signature decorator is problematic
self, conf: dict[str, Any], entity_type: str
) -> tuple[CheckResult, dict[str, Any]]:
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type
self.entity_conf = conf

Expand Down
2 changes: 1 addition & 1 deletion checkov/argo_workflows/checks/base_argo_workflows_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
33 changes: 15 additions & 18 deletions checkov/arm/base_parameter_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from abc import abstractmethod
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any

from checkov.arm.registry import arm_parameter_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.multi_signature import multi_signature

if TYPE_CHECKING:
from checkov.common.models.enums import CheckCategories, CheckResult
Expand All @@ -19,26 +18,24 @@ def __init__(
id: str,
categories: Iterable[CheckCategories],
supported_resources: Iterable[str],
guideline: str | None = None
guideline: str | None = None,
) -> None:
super().__init__(name=name, id=id, categories=categories, supported_entities=supported_resources,
block_type="parameter", guideline=guideline)
super().__init__(
name=name,
id=id,
categories=categories,
supported_entities=supported_resources,
block_type="parameter",
guideline=guideline,
)
self.supported_resources = supported_resources
arm_parameter_registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok
return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
21 changes: 5 additions & 16 deletions checkov/arm/base_resource_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

from abc import abstractmethod
from collections.abc import Iterable
from typing import Any, Callable
from typing import Any

from checkov.arm.registry import arm_resource_registry
from checkov.bicep.checks.resource.registry import registry as bicep_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.multi_signature import multi_signature


class BaseResourceCheck(BaseCheck):
Expand All @@ -33,7 +32,7 @@ def __init__(
# leverage ARM checks to use with bicep runner
bicep_registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult: # type:ignore[override] # it's ok
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

# the "existing" key indicates a Bicep resource
Expand All @@ -50,22 +49,12 @@ def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResul
# this means the whole resource block is surrounded by a for loop
resource_conf = resource_conf["config"]

return self.scan_resource_conf(resource_conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
return self.scan_resource_conf(resource_conf)

self.api_version = None

return self.scan_resource_conf(conf, entity_type) # type:ignore[no-any-return] # issue with multi_signature annotation
return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: dict[str, Any], entity_type: str | None = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
2 changes: 1 addition & 1 deletion checkov/arm/base_resource_negative_value_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
def _is_variable_dependant(value: Any) -> bool:
return bool(isinstance(value, str) and re.match(VARIABLE_DEPENDANT_REGEX, value))

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
inspected_key = self.get_inspected_key()
forbidden_values = self.get_forbidden_values()
value = find_in_dict(conf, inspected_key)
Expand Down
2 changes: 1 addition & 1 deletion checkov/arm/base_resource_value_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _is_variable_dependant(value: Any) -> bool:
return True
return False

def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: # type:ignore[override] # issue with multi_signature annotation
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
inspected_key = self.get_inspected_key()
expected_values = self.get_expected_values()
value = find_in_dict(conf, inspected_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
self.path = path
registry.register(self)

def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]: # type:ignore[override] # multi_signature decorator is problematic
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> tuple[CheckResult, dict[str, Any]]:
self.entity_type = entity_type

return self.scan_conf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.multi_signature import multi_signature
from checkov.cloudformation.checks.utils.iam_cloudformation_document_to_policy_converter import \
convert_cloudformation_conf_to_iam_policy

Expand Down Expand Up @@ -77,7 +76,6 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
return CheckResult.UNKNOWN
return CheckResult.PASSED

@multi_signature()
@abstractmethod
def cloudsplaining_analysis(self, policy: PolicyDocument) -> list[str]:
raise NotImplementedError()
17 changes: 3 additions & 14 deletions checkov/cloudformation/checks/resource/base_resource_check.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import abstractmethod
from collections.abc import Iterable
from typing import Callable, Optional, Dict, Any
from typing import Optional, Dict, Any

from checkov.cloudformation.checks.resource.registry import cfn_registry
from checkov.common.checks.base_check import BaseCheck
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.common.multi_signature import multi_signature


class BaseResourceCheck(BaseCheck):
Expand All @@ -31,18 +30,8 @@ def __init__(
def scan_entity_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
self.entity_type = entity_type

return self.scan_resource_conf(conf, entity_type)
return self.scan_resource_conf(conf)

@multi_signature()
@abstractmethod
def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
raise NotImplementedError()

@classmethod
@scan_resource_conf.add_signature(args=["self", "conf"])
def _scan_resource_conf_self_conf(cls, wrapped: Callable[..., CheckResult]) -> Callable[..., CheckResult]:
def wrapper(self: BaseCheck, conf: Dict[str, Any], entity_type: Optional[str] = None) -> CheckResult:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)
self.missing_block_result = missing_block_result

def scan_resource_conf(self, conf: Dict[str, Any], entity_type: str) -> CheckResult:
def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult:
excluded_key = self.get_excluded_key()
if excluded_key is not None:
path_elements = excluded_key.split("/")
Expand Down
41 changes: 9 additions & 32 deletions checkov/common/checks/base_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@

import logging
import os
from abc import abstractmethod
from abc import abstractmethod, ABC
from collections.abc import Iterable
from typing import List, Dict, Any, Callable, Optional
from typing import List, Dict, Any, Optional

from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.typing import _SkippedCheck, _CheckResult
from checkov.common.util.type_forcers import force_list
from checkov.common.models.enums import CheckResult, CheckCategories, CheckFailLevel
from checkov.common.multi_signature import MultiSignatureMeta, multi_signature


class BaseCheck(metaclass=MultiSignatureMeta):
class BaseCheck(ABC):
def __init__(
self,
name: str,
Expand Down Expand Up @@ -48,7 +47,7 @@ def __init__(
def run(
self,
scanned_file: str,
entity_configuration: Dict[str, List[Any]],
entity_configuration: Dict[str, Any],
entity_name: str,
entity_type: str,
skip_info: _SkippedCheck,
Expand All @@ -58,51 +57,29 @@ def run(
if skip_info:
check_result["result"] = CheckResult.SKIPPED
check_result["suppress_comment"] = skip_info["suppress_comment"]
message = 'File {}, {} "{}.{}" check "{}" Result: {}, Suppression comment: {} '.format(
scanned_file,
self.block_type,
entity_type,
entity_name,
self.name,
check_result,
check_result["suppress_comment"],
self.logger.debug(
f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}, Suppression comment: {check_result["suppress_comment"]}'
)
self.logger.debug(message)
else:
try:
self.evaluated_keys = []
self.entity_path = f"{scanned_file}:{entity_type}:{entity_name}"
check_result["result"] = self.scan_entity_conf(entity_configuration, entity_type)
check_result["evaluated_keys"] = self.get_evaluated_keys()
message = 'File {}, {} "{}.{}" check "{}" Result: {} '.format(
scanned_file, self.block_type, entity_type, entity_name, self.name, check_result
self.logger.debug(
f'File {scanned_file}, {self.block_type} "{entity_type}.{entity_name}" check "{self.name}" Result: {check_result}'
)
self.logger.debug(message)

except Exception:
self.log_check_error(scanned_file=scanned_file, entity_type=entity_type, entity_name=entity_name,
entity_configuration=entity_configuration)
raise
return check_result

@multi_signature()
@abstractmethod
def scan_entity_conf(self, conf: dict[str, Any], entity_type: str) -> CheckResult | tuple[CheckResult, dict[str, Any]]:
raise NotImplementedError()

@classmethod
@scan_entity_conf.add_signature(args=["self", "conf"])
def _scan_entity_conf_self_conf(
cls, wrapped: Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]]
) -> Callable[..., CheckResult | tuple[CheckResult, dict[str, Any]]]:
def wrapper(
self: "BaseCheck", conf: Dict[str, Any], entity_type: Optional[str] = None
) -> CheckResult | tuple[CheckResult, dict[str, Any]]:
# keep default argument for entity_type so old code, that doesn't set it, will work.
return wrapped(self, conf)

return wrapper

def get_evaluated_keys(self) -> List[str]:
"""
Retrieves the evaluated keys for the run's report. Child classes override the function and return the `expected_keys` instead.
Expand All @@ -114,7 +91,7 @@ def get_output_id(self, use_bc_ids: bool) -> str:
return self.bc_id if self.bc_id and use_bc_ids else self.id

def log_check_error(self, scanned_file: str, entity_type: str, entity_name: str,
entity_configuration: Dict[str, List[Any]]) -> None:
entity_configuration: Dict[str, Any]) -> None:
if self.check_fail_level == CheckFailLevel.ERROR:
logging.error(f'Failed to run check {self.id} on {scanned_file}:{entity_type}.{entity_name}',
exc_info=True)
Expand Down
Loading