Skip to content

Commit

Permalink
feat(ingest/airflow): Add way to disable Airflow plugin without a res…
Browse files Browse the repository at this point in the history
…tart (#12098)
  • Loading branch information
treff7es authored Dec 16, 2024
1 parent 35a054f commit b3a76cb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
31 changes: 31 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,37 @@ TypeError: on_task_instance_success() missing 3 required positional arguments: '

The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details.

### Disabling the DataHub Plugin v2

There are two ways to disable the DataHub Plugin v2:

#### 1. Disable via Configuration

Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.

```ini title="airflow.cfg"
[datahub]
enabled = False
```

#### 2. Disable via Airflow Variable (Kill-Switch)

If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.

#### Command Line

```shell
airflow variables set datahub_airflow_plugin_disable_listener true
```

#### Airflow UI

1. Go to Admin -> Variables.
2. Click the "+" symbol to create a new variable.
3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.

This will immediately disable the plugin without requiring a restart.

## Compatibility

We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import airflow
import datahub.emitter.mce_builder as builder
from airflow.models import Variable
from airflow.models.serialized_dag import SerializedDagModel
from datahub.api.entities.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
Expand Down Expand Up @@ -78,6 +79,8 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"

KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"


def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
# Using globals instead of functools.lru_cache to make testing easier.
Expand Down Expand Up @@ -364,6 +367,12 @@ def _extract_lineage(
redact_with_exclusions(v)
)

def check_kill_switch(self):
if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
logger.debug("DataHub listener disabled by kill switch")
return True
return False

@hookimpl
@run_in_thread
def on_task_instance_running(
Expand All @@ -372,6 +381,8 @@ def on_task_instance_running(
task_instance: "TaskInstance",
session: "Session", # This will always be QUEUED
) -> None:
if self.check_kill_switch():
return
self._set_log_level()

# This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
Expand Down Expand Up @@ -454,6 +465,9 @@ def on_task_instance_running(
f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}"
)

self.materialize_iolets(datajob)

def materialize_iolets(self, datajob: DataJob) -> None:
if self.config.materialize_iolets:
for outlet in datajob.outlets:
reported_time: int = int(time.time() * 1000)
Expand Down Expand Up @@ -541,6 +555,9 @@ def on_task_instance_finish(
def on_task_instance_success(
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
) -> None:
if self.check_kill_switch():
return

self._set_log_level()

logger.debug(
Expand All @@ -556,6 +573,9 @@ def on_task_instance_success(
def on_task_instance_failed(
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
) -> None:
if self.check_kill_switch():
return

self._set_log_level()

logger.debug(
Expand Down Expand Up @@ -696,6 +716,9 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
@hookimpl
@run_in_thread
def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
if self.check_kill_switch():
return

self._set_log_level()

logger.debug(
Expand Down

0 comments on commit b3a76cb

Please sign in to comment.