-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This PR builds on the work in the initial PR to move business rules t…
…o celery along with info learned deploying this. Avoid filling the task queue with orchestration tasks and starving the workers. =============================================================================== In the previous system there were about 3 layers of tasks, that orchestrated other tasks, by using the .replace() API in each task. Unfortunately it was possible for celery workers to become full of orchestration tasks leaving no room for the business rule tasks at the bottom of the to actually run. This PR attempts two mitigations: 1. Use celery workflows instead of .replace() This PR builds a celery workflow in the check_workbasket using celery constructs such as chain and group. In theory, since most of the work is done ahead of time the system should have more awareness of the task structure avoiding the issue of starvation. 2. Cancel existing workbasket checks when a new check is requested. When check_workbasket is started, it will attempt to revoke existing check_workbasket tasks for the same workbasket. Treat intermediate data structures as ephemeral =============================================== A celery task may execute at any time, right now - or when a system comes up tomorrow, based on this assumption models such as TrackedModelCheck (which stores the result of a business rule check on a TrackedModel) are no longer passed to celery tasks by ID, instead all the information needed to receate the data is passed to the celery task, this means the system will still work even if developers delete these while it is running. Reduce layers in business rule checking ======================================= BusinessRuleChecker and LinkedModelsBusinessRuleChecker are now the only checkers, these now take BusinessRule instances, instead of being subclassed for each business rule. While more parameters are passed when rules are checked a conceptual layer has been removed and the simplification is reflected with around 20 lines of code being removed from checks.py Celery flower is now very easier to read ======================================== Due to the changes above, the output in celery flower should correspond more closely to a users intentions - ids of models. Content Checksums ================= Result caching now validates using checksums of the content, which should reduce the amount of checking the system needs to do. When a workbasket has been published, it's content could invalidate some content in other unpublished workbaskets, by associating business rule checks with checksums of a models content, any models that do not clash can be skipped. Model checksums (generated by `.content_hash()`) are not currently stored in the database (though it may be desirable to store them on TrackedModels, as it would provide an mechanism to address any content in the system). The checksuming scheme is a combination of the type and a sha256 of the fields in `.copyable_fields` (which should represent the fields a user can edit, but not fields such as pk). Blake3 was tested, as it provides a fast hashing algorithm, in practice it didn't provide much of a speedup over sha256. Greets ====== This PR adapts changes and builds on the hard work done in the initial work to check the business rules with celery, thanks to Simon Worthington and the hard work of the other devs on the project.
- Loading branch information
Showing
33 changed files
with
1,530 additions
and
895 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,226 +1,203 @@ | ||
from functools import cached_property | ||
from typing import Collection | ||
from typing import Dict | ||
from typing import Iterator | ||
import logging | ||
from collections import defaultdict | ||
from typing import Optional | ||
from typing import Set | ||
from typing import Tuple | ||
from typing import Type | ||
from typing import TypeVar | ||
|
||
from django.conf import settings | ||
|
||
from checks.models import TrackedModelCheck | ||
from checks.models import TransactionCheck | ||
from common.business_rules import ALL_RULES | ||
from common.business_rules import BusinessRule | ||
from common.business_rules import BusinessRuleViolation | ||
from common.models.trackedmodel import TrackedModel | ||
from common.models import TrackedModel | ||
from common.models import Transaction | ||
from common.models.utils import get_current_transaction | ||
from common.models.utils import override_current_transaction | ||
|
||
CheckResult = Tuple[bool, Optional[str]] | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
Self = TypeVar("Self") | ||
CheckResult = Tuple[bool, Optional[str]] | ||
|
||
|
||
class Checker: | ||
""" | ||
A ``Checker`` is an object that knows how to perform a certain kind of check | ||
against a model. | ||
Checkers can be applied against a model. The logic of the checker will be | ||
run and the result recorded as a ``TrackedModelCheck``. | ||
""" | ||
|
||
@cached_property | ||
def name(self) -> str: | ||
""" | ||
The name string that on a per-model basis uniquely identifies the | ||
checker. | ||
The name should be deterministic (i.e. not rely on the current | ||
environment, memory locations or random data) so that the system can | ||
record the name in the database and later use it to work out whether | ||
this check has been run. The name doesn't need to include any details | ||
about the model. | ||
By default this is the name of the class, but it can include any other | ||
non-model data that is unique to the checker. For a more complex | ||
example, see ``IndirectBusinessRuleChecker.name``. | ||
""" | ||
return type(self).__name__ | ||
|
||
@classmethod | ||
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]: | ||
def run_rule( | ||
cls, | ||
rule: BusinessRule, | ||
transaction: Transaction, | ||
model: TrackedModel, | ||
) -> CheckResult: | ||
""" | ||
Returns instances of this ``Checker`` that should apply to the model. | ||
Run a single business rule on a single model. | ||
What checks apply to a model is sometimes data-dependent, so it is the | ||
responsibility of the ``Checker`` class to tell the system what | ||
instances of itself it would expect to run against the model. For an | ||
example, see ``IndirectBusinessRuleChecker.checkers_for``. | ||
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]). | ||
""" | ||
raise NotImplementedError() | ||
logger.debug(f"run_rule %s %s %s", model, rule, transaction.pk) | ||
try: | ||
rule(transaction).validate(model) | ||
logger.debug(f"%s [tx:%s] %s [passed]", model, rule, transaction.pk) | ||
return True, None | ||
except BusinessRuleViolation as violation: | ||
reason = violation.args[0] | ||
logger.debug(f"%s [tx:%s] %s [failed]", model, rule, transaction.pk, reason) | ||
return False, reason | ||
|
||
def run(self, model: TrackedModel) -> CheckResult: | ||
"""Runs Checker-dependent logic and returns an indication of success.""" | ||
raise NotImplementedError() | ||
@classmethod | ||
def apply_rule( | ||
cls, | ||
rule: BusinessRule, | ||
transaction: Transaction, | ||
model: TrackedModel, | ||
): | ||
""" | ||
Applies the check to the model and records success. | ||
def apply(self, model: TrackedModel, context: TransactionCheck): | ||
"""Applies the check to the model and records success.""" | ||
:return: TrackedModelCheck instance containing the result of the check. | ||
During debugging the developer can set settings.RAISE_BUSINESS_RULE_FAILURES | ||
to True to raise business rule violations as exceptions. | ||
""" | ||
success, message = False, None | ||
try: | ||
with override_current_transaction(context.transaction): | ||
success, message = self.run(model) | ||
with override_current_transaction(transaction): | ||
success, message = cls.run_rule(rule, transaction, model) | ||
except Exception as e: | ||
success, message = False, str(e) | ||
if settings.RAISE_BUSINESS_RULE_FAILURES: | ||
raise | ||
finally: | ||
return TrackedModelCheck.objects.create( | ||
check, created = TrackedModelCheck.objects.get_or_create( | ||
{ | ||
"successful": success, | ||
"message": message, | ||
"content_hash": model.content_hash().digest(), | ||
}, | ||
model=model, | ||
transaction_check=context, | ||
check_name=self.name, | ||
successful=success, | ||
message=message, | ||
check_name=rule.__name__, | ||
) | ||
|
||
|
||
class BusinessRuleChecker(Checker): | ||
""" | ||
A ``Checker`` that runs a ``BusinessRule`` against a model. | ||
This class is expected to be sub-typed for a specific rule by a call to | ||
``of()``. | ||
Attributes: | ||
checker_cache (dict): (class attribute) Cache of Business checkers created by ``of()``. | ||
""" | ||
|
||
rule: Type[BusinessRule] | ||
|
||
_checker_cache: Dict[str, BusinessRule] = {} | ||
if not created: | ||
check.successful = success | ||
check.message = message | ||
check.content_hash = model.content_hash().digest() | ||
check.save() | ||
return check | ||
|
||
@classmethod | ||
def of(cls: Type, rule_type: Type[BusinessRule]) -> Type: | ||
def apply_rule_cached( | ||
cls, | ||
rule: BusinessRule, | ||
transaction: Transaction, | ||
model: TrackedModel, | ||
): | ||
""" | ||
Return a subclass of a Checker, e.g. BusinessRuleChecker, | ||
IndirectBusinessRuleChecker that runs the passed in business rule. | ||
If a matching TrackedModelCheck instance exists, returns it, otherwise | ||
check rule, and return the result as a TrackedModelCheck instance. | ||
Example, creating a BusinessRuleChecker for ME32: | ||
:return: TrackedModelCheck instance containing the result of the check. | ||
""" | ||
try: | ||
check = TrackedModelCheck.objects.get( | ||
model=model, | ||
check_name=rule.__name__, | ||
) | ||
except TrackedModelCheck.DoesNotExist: | ||
logger.debug( | ||
"apply_rule_cached (no existing check) %s, %s apply rule", | ||
rule.__name__, | ||
transaction, | ||
) | ||
return cls.apply_rule(rule, transaction, model) | ||
|
||
# Re-run the rule if the check is not successful. | ||
check_hash = bytes(check.content_hash) | ||
model_hash = model.content_hash().digest() | ||
if check_hash == model_hash: | ||
logger.debug( | ||
"apply_rule_cached (matching content hash) %s, tx: %s, using cached result %s", | ||
rule.__name__, | ||
transaction.pk, | ||
check, | ||
) | ||
return check | ||
|
||
>>> BusinessRuleChecker.of(measures.business_rules.ME32) | ||
<class 'checks.checks.BusinessRuleCheckerOf[measures.business_rules.ME32]'> | ||
logger.debug( | ||
"apply_rule_cached (check.content_hash != model.content_hash()) %s != %s %s, %s apply rule", | ||
check_hash, | ||
model_hash, | ||
rule.__name__, | ||
transaction, | ||
) | ||
check.delete() | ||
return cls.apply_rule(rule, transaction, model) | ||
|
||
This API is usually called by .applicable_to, however this docstring should | ||
illustrate what it does. | ||
|
||
Checkers are created once and then cached in _checker_cache. | ||
class BusinessRuleChecker(Checker): | ||
"""Apply BusinessRules specified in a TrackedModels business_rules | ||
attribute.""" | ||
|
||
As well as a small performance improvement, caching aids debugging by ensuring | ||
the same checker instance is returned if the same cls is passed to ``of``. | ||
@classmethod | ||
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set[str]] = None): | ||
""" | ||
checker_name = f"{cls.__name__}Of[{rule_type.__module__}.{rule_type.__name__}]" | ||
|
||
# If the checker class was already created, return it. | ||
checker_class = cls._checker_cache.get(checker_name) | ||
if checker_class is not None: | ||
return checker_class | ||
# No existing checker was found, so create it: | ||
:param model: TrackedModel instance | ||
:param rules: Optional list of rule names to filter by. | ||
:return: Dict mapping models to a set of the BusinessRules that apply to them. | ||
""" | ||
model_rules = defaultdict(set) | ||
|
||
class BusinessRuleCheckerOf(cls): | ||
# Creating this class explicitly in code is more readable than using type(...) | ||
# Once created the name will be mangled to include the rule to be checked. | ||
for rule in model.business_rules: | ||
if rules is not None and rule.__name__ not in rules: | ||
continue | ||
|
||
f"""Apply the following checks as specified in {rule_type.__name__}""" | ||
rule = rule_type | ||
model_rules[model].add(rule) | ||
|
||
def __repr__(self): | ||
return f"<{checker_name}>" | ||
return model_rules | ||
|
||
BusinessRuleCheckerOf.__name__ = checker_name | ||
|
||
cls._checker_cache[checker_name] = BusinessRuleCheckerOf | ||
return BusinessRuleCheckerOf | ||
class LinkedModelsBusinessRuleChecker(Checker): | ||
"""Apply BusinessRules specified in a TrackedModels indirect_business_rules | ||
attribute to models returned by get_linked_models on those rules.""" | ||
|
||
@classmethod | ||
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]: | ||
"""If the rule attribute on this BusinessRuleChecker matches any in the | ||
supplied TrackedModel instance's business_rules, return it in a list, | ||
otherwise there are no matches so return an empty list.""" | ||
if cls.rule in model.business_rules: | ||
return [cls()] | ||
return [] | ||
|
||
def run(self, model: TrackedModel) -> CheckResult: | ||
""" | ||
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]). | ||
def apply_rule( | ||
cls, | ||
rule: BusinessRule, | ||
transaction: Transaction, | ||
model: TrackedModel, | ||
): | ||
""" | ||
transaction = get_current_transaction() | ||
try: | ||
self.rule(transaction).validate(model) | ||
return True, None | ||
except BusinessRuleViolation as violation: | ||
return False, violation.args[0] | ||
|
||
|
||
class IndirectBusinessRuleChecker(BusinessRuleChecker): | ||
""" | ||
A ``Checker`` that runs a ``BusinessRule`` against a model that is linked to | ||
the model being checked, and for which a change in the checked model could | ||
result in a business rule failure against the linked model. | ||
LinkedModelsBusinessRuleChecker assumes that the linked models are still | ||
the current. | ||
This is a base class: subclasses for checking specific rules are created by | ||
calling ``of()``. | ||
""" | ||
versions (TODO - ensure a business rule checks this), | ||
rule: Type[BusinessRule] | ||
linked_model: TrackedModel | ||
|
||
def __init__(self, linked_model: TrackedModel) -> None: | ||
self.linked_model = linked_model | ||
super().__init__() | ||
|
||
@cached_property | ||
def name(self) -> str: | ||
# Include the identity of the linked model in the checker name, so that | ||
# each linked model needs to be checked for all checks to be complete. | ||
return f"{super().name}[{self.linked_model.pk}]" | ||
The transaction to check is set to that of the model, which enables | ||
""" | ||
return super().apply_rule(rule, model.transaction, model) | ||
|
||
@classmethod | ||
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]: | ||
"""Return a set of IndirectBusinessRuleCheckers for every model found on | ||
rule.get_linked_models.""" | ||
rules = set() | ||
transaction = get_current_transaction() | ||
if cls.rule in model.indirect_business_rules: | ||
for linked_model in cls.rule.get_linked_models(model, transaction): | ||
rules.add(cls(linked_model)) | ||
return rules | ||
|
||
def run(self, model: TrackedModel) -> CheckResult: | ||
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set] = None): | ||
""" | ||
Return the result of running super.run, passing self.linked_model, and. | ||
return it as a CheckResult - a Tuple(rule_passed: str, violation_reason: Optional[str]) | ||
:param model: TrackedModel instance | ||
:param rules: Optional list of rule names to filter by. | ||
:return: Dict mapping models to a set of the BusinessRules that apply to them. | ||
""" | ||
result, message = super().run(self.linked_model) | ||
message = f"{self.linked_model}: " + message if message else None | ||
return result, message | ||
tx = get_current_transaction() | ||
|
||
model_rules = defaultdict(set) | ||
|
||
for rule in [*model.indirect_business_rules]: | ||
for linked_model in rule.get_linked_models(model, tx): | ||
if rules is not None and rule.__name__ not in rules: | ||
continue | ||
|
||
def checker_types() -> Iterator[Type[Checker]]: | ||
""" | ||
Return all registered Checker types. | ||
model_rules[linked_model].add(rule) | ||
|
||
See ``checks.checks.BusinessRuleChecker.of``. | ||
""" | ||
for rule in ALL_RULES: | ||
yield BusinessRuleChecker.of(rule) | ||
yield IndirectBusinessRuleChecker.of(rule) | ||
return model_rules | ||
|
||
|
||
def applicable_to(model: TrackedModel) -> Iterator[Checker]: | ||
"""Return instances of any Checker classes applicable to the supplied | ||
TrackedModel instance.""" | ||
for checker_type in checker_types(): | ||
yield from checker_type.checkers_for(model) | ||
# Checkers in priority list order, checkers for linked models come first. | ||
ALL_CHECKERS = { | ||
"LinkedModelsBusinessRuleChecker": LinkedModelsBusinessRuleChecker, | ||
"BusinessRuleChecker": BusinessRuleChecker, | ||
} |
Oops, something went wrong.