Skip to content

Commit

Permalink
Setup HA operator
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed Aug 24, 2024
1 parent 2d9a46c commit bf3678a
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 25 deletions.
2 changes: 2 additions & 0 deletions airflow_ha/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
__version__ = "0.1.0"

from .operator import HighAvailabilityOperator
153 changes: 128 additions & 25 deletions airflow_ha/operator.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
from airflow.models.dag import DAG # noqa: F401
from airflow.models.operator import Operator # noqa: F401
from airflow.operators.python import BranchPythonOperator, PythonOperator # noqa: F401
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # noqa: F401
from typing import Literal

from airflow.models.operator import Operator
from airflow.exceptions import AirflowSkipException, AirflowFailException
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor

__all__ = ("HighAvailabilityOperator",)


CheckResult = Literal[
"done",
"running",
"failed",
]


def skip_():
raise AirflowSkipException

Check warning on line 20 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L20

Added line #L20 was not covered by tests


def fail_():
raise AirflowFailException

Check warning on line 24 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L24

Added line #L24 was not covered by tests


def pass_():
pass

Check warning on line 28 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L28

Added line #L28 was not covered by tests


class HighAvailabilityOperator(PythonSensor):
_decide_task: BranchPythonOperator

_end_fail: Operator
_end_pass: Operator

_loop_pass: Operator
_loop_fail: Operator

_done_task: Operator
_end_task: Operator
_running_task: Operator
_failed_task: Operator
_kill_task: Operator

_cleanup_task: Operator
_loop_task: Operator
_restart_task: Operator

class HighAvailabilityOperator(PythonOperator):
def __init__(self, **kwargs) -> None:
"""The HighAvailabilityOperator is an Airflow Meta-Operator for long-running or "always-on" tasks.
It resembles a BranchPythonOperator with the following predefined set of outcomes:
HA-Task (the instance of HighAvailabilityOperator itself)
|-> CheckTask (run a user-provided check or task)
| DoneTask (tasks finished cleanly)------|
| EndTask (end time reached)-------------|
|
|--> CleanupTask (Finish DAG, success)
| RunningTask (tasks are still running)--|
|--> LoopTask (Re-trigger DAG, success)
| FailedTask (tasks finished uncleanly)--|
|--> RestartTask (Re-trigger DAG, failure)
| KillTask-------------------------------|
|--> CleanupTask (Finish DAG, failure)
/-> "done" -> Done -> EndPass
check -> decide -> "running" -> Loop -> EndPass
\-> "failed" -> Loop -> EndFail
\-------------> failed -> Loop -> EndPass
Given a check, there are four outcomes:
- The tasks finished/exited cleanly, and thus the DAG should terminate cleanly
Expand All @@ -36,11 +70,80 @@ def __init__(self, **kwargs) -> None:
Any setup should be state-aware (e.g. don't just start a process, check if it is currently started first).
"""
...

kwargs.pop("python_callable", None)
kwargs.pop("op_args", None)
kwargs.pop("op_kwargs", None)
kwargs.pop("templates_dict", None)
kwargs.pop("templates_exts", None)
kwargs.pop("show_return_value_in_logs", None)
python_callable = kwargs.pop("python_callable")

Check warning on line 73 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L73

Added line #L73 was not covered by tests

def _callable_wrapper(**kwargs):
task_instance = kwargs["task_instance"]
ret: CheckResult = python_callable(**kwargs)

Check warning on line 77 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L75-L77

Added lines #L75 - L77 were not covered by tests
if ret == "done":
task_instance.xcom_push(key="return_value", value="done")

Check warning on line 79 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L79

Added line #L79 was not covered by tests
# finish
return True

Check warning on line 81 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L81

Added line #L81 was not covered by tests
elif ret == "failed":
task_instance.xcom_push(key="return_value", value="failed")

Check warning on line 83 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L83

Added line #L83 was not covered by tests
# finish
return True

Check warning on line 85 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L85

Added line #L85 was not covered by tests
elif ret == "running":
task_instance.xcom_push(key="return_value", value="running")

Check warning on line 87 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L87

Added line #L87 was not covered by tests
# finish
return True
task_instance.xcom_push(key="return_value", value="")
return False

Check warning on line 91 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L89-L91

Added lines #L89 - L91 were not covered by tests

super().__init__(python_callable=_callable_wrapper, **kwargs)

Check warning on line 93 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L93

Added line #L93 was not covered by tests

self._end_fail = PythonOperator(task_id=f"{self.task_id}-dag-fail", python_callable=fail_, trigger_rule="all_success")
self._end_pass = PythonOperator(task_id=f"{self.task_id}-dag-pass", python_callable=pass_, trigger_rule="all_success")

Check warning on line 96 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L95-L96

Added lines #L95 - L96 were not covered by tests

self._loop_fail = TriggerDagRunOperator(task_id=f"{self.task_id}-loop-fail", trigger_dag_id=self.dag_id, trigger_rule="all_success")
self._loop_pass = TriggerDagRunOperator(task_id=f"{self.task_id}-loop-pass", trigger_dag_id=self.dag_id, trigger_rule="one_success")

Check warning on line 99 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L98-L99

Added lines #L98 - L99 were not covered by tests

self._done_task = PythonOperator(task_id=f"{self.task_id}-done", python_callable=pass_, trigger_rule="all_success")
self._running_task = PythonOperator(task_id=f"{self.task_id}-running", python_callable=pass_, trigger_rule="all_success")
self._failed_task = PythonOperator(task_id=f"{self.task_id}-failed", python_callable=pass_, trigger_rule="all_success")
self._sensor_failed_task = PythonOperator(task_id=f"{self.task_id}-sensor-timeout", python_callable=pass_, trigger_rule="all_failed")

Check warning on line 104 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L101-L104

Added lines #L101 - L104 were not covered by tests

branch_choices = {

Check warning on line 106 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L106

Added line #L106 was not covered by tests
"done": self._done_task.task_id,
"running": self._running_task.task_id,
"failed": self._failed_task.task_id,
"": self._sensor_failed_task.task_id,
}

def _choose_branch(branch_choices=branch_choices, **kwargs):
task_instance = kwargs["task_instance"]
check_program_result = task_instance.xcom_pull(key="return_value", task_ids=self.task_id)
ret = branch_choices.get(check_program_result, None)

Check warning on line 116 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L113-L116

Added lines #L113 - L116 were not covered by tests
if ret is None:
raise AirflowSkipException
return ret

Check warning on line 119 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L118-L119

Added lines #L118 - L119 were not covered by tests

self._decide_task = BranchPythonOperator(

Check warning on line 121 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L121

Added line #L121 was not covered by tests
task_id=f"{self.task_id}-decide",
python_callable=_choose_branch,
provide_context=True,
trigger_rule="all_success",
)

self >> self._sensor_failed_task >> self._loop_pass >> self._end_pass
self >> self._decide_task >> self._done_task
self >> self._decide_task >> self._running_task >> self._loop_pass >> self._end_pass
self >> self._decide_task >> self._failed_task >> self._loop_fail >> self._end_fail

Check warning on line 131 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L128-L131

Added lines #L128 - L131 were not covered by tests

@property
def check(self) -> Operator:
return self

Check warning on line 135 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L135

Added line #L135 was not covered by tests

@property
def failed(self) -> Operator:
# NOTE: use loop_fail as this will pass, but self._end_fail will fail to mark the DAG failed
return self._loop_fail

Check warning on line 140 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L140

Added line #L140 was not covered by tests

@property
def passed(self) -> Operator:
# NOTE: use loop_pass here to match failed()
return self._loop_pass

Check warning on line 145 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L145

Added line #L145 was not covered by tests

@property
def done(self) -> Operator:
return self._done_task

Check warning on line 149 in airflow_ha/operator.py

View check run for this annotation

Codecov / codecov/patch

airflow_ha/operator.py#L149

Added line #L149 was not covered by tests

0 comments on commit bf3678a

Please sign in to comment.