Skip to content

Commit

Permalink
add tag and retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
vitor authored and vitor committed Feb 26, 2024
1 parent babfe81 commit 324dcf7
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion config/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.exceptions import AirflowClusterPolicyViolation

ALLOWED_OWNERS = "team_contacts"
ALLOWED_TAGS = "airflow_tags_allowed_list"


def dag_policy(dag):
Expand All @@ -26,6 +27,11 @@ def dag_policy(dag):
else dag.dagrun_timeout
)

# Set tasks retries max to 3
retries = dag.default_args.get("retries", False)
if retries and retries > 3:
dag.default_args["retries"] = 3

# Check if owner exists
owner = dag.default_args.get("owner", "")
owner_dag_list = owner.split(",")
Expand All @@ -49,8 +55,18 @@ def dag_policy(dag):
)

# Check if dag has tags
if not dag.tags:
tags = dag.tags
if not tags:
raise AirflowClusterPolicyViolation(
f"DAG has no tags. At least one tag required."
)

# Check if tag is allowed
tag_allowed_list = yaml.safe_load(Variable.get(ALLOWED_TAGS))
if not all(item in tag_allowed_list for item in tags):
raise AirflowClusterPolicyViolation(
f"One of tags(s) {tags} not in Airflow Variable {ALLOWED_TAGS}"
)


# EOF

0 comments on commit 324dcf7

Please sign in to comment.