From deb598c29000057b86df0655c40dfa016e2dde4b Mon Sep 17 00:00:00 2001 From: Tim Paine <3105306+timkpaine@users.noreply.github.com> Date: Thu, 8 Aug 2024 16:22:24 -0400 Subject: [PATCH] integrate with airflow-config --- airflow_priority/common.py | 17 +++++++ airflow_priority/plugins/datadog.py | 6 +-- airflow_priority/plugins/discord.py | 7 +-- airflow_priority/plugins/newrelic.py | 7 ++- airflow_priority/plugins/slack.py | 7 +-- airflow_priority/plugins/symphony.py | 6 +-- airflow_priority/tests/conftest.py | 2 +- airflow_priority/tests/test_airflow_config.py | 47 +++++++++++++++++++ airflow_priority/tests/test_datadog.py | 4 ++ airflow_priority/tests/test_discord.py | 4 ++ airflow_priority/tests/test_newrelic.py | 4 ++ airflow_priority/tests/test_slack.py | 7 ++- airflow_priority/tests/test_symphony.py | 3 +- pyproject.toml | 5 ++ 14 files changed, 107 insertions(+), 19 deletions(-) create mode 100644 airflow_priority/tests/test_airflow_config.py diff --git a/airflow_priority/common.py b/airflow_priority/common.py index 95597dd..de3e53f 100644 --- a/airflow_priority/common.py +++ b/airflow_priority/common.py @@ -1,3 +1,5 @@ +import os +from pathlib import Path from types import MappingProxyType from typing import Literal, Optional, Tuple @@ -29,6 +31,21 @@ class AirflowPriorityConfigurationOptionNotFound(RuntimeError): ... def get_config_option(section, key, required=True, default=None): + try: + try: + from airflow_config import ConfigNotFoundError, Configuration + + config = Configuration.load("config", "config", basepath=str(Path(os.environ.get("AIRFLOW_HOME", "")) / "dags"), _offset=4) + ret = getattr(getattr(config.extensions.get("priority", None), section, None), key, None) + if ret is not None: + return ret + except ConfigNotFoundError: + # SKIP + pass + except ImportError: + # SKIP + pass + import airflow.configuration config_option = airflow.configuration.conf.get(f"priority.{section}", key, default) diff --git a/airflow_priority/plugins/datadog.py b/airflow_priority/plugins/datadog.py index 6be2658..4fd3a86 100644 --- a/airflow_priority/plugins/datadog.py +++ b/airflow_priority/plugins/datadog.py @@ -14,7 +14,7 @@ from datadog_api_client.v2.model.metric_resource import MetricResource from datadog_api_client.v2.model.metric_series import MetricSeries -from airflow_priority import DagStatus, get_config_option, has_priority_tag +from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag __all__ = ( "send_metric_datadog", @@ -87,10 +87,10 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): try: # Call once to ensure plugin will work - get_configuration() + get_config_option("datadog", "api_key") class DatadogPriorityPlugin(AirflowPlugin): name = "DatadogPriorityPlugin" listeners = [sys.modules[__name__]] -except Exception: +except AirflowPriorityConfigurationOptionNotFound: _log.exception("Plugin could not be enabled") diff --git a/airflow_priority/plugins/discord.py b/airflow_priority/plugins/discord.py index df55d12..b780cdc 100644 --- a/airflow_priority/plugins/discord.py +++ b/airflow_priority/plugins/discord.py @@ -11,7 +11,7 @@ from airflow.plugins_manager import AirflowPlugin from discord import Client, Intents -from airflow_priority import DagStatus, get_config_option, has_priority_tag +from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag __all__ = ("get_client", "send_metric_discord", "on_dag_run_failed", "DiscordPriorityPlugin") @@ -67,10 +67,11 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): try: # Call once to ensure plugin will work - get_client() + get_config_option("discord", "channel") + get_config_option("discord", "token") class DiscordPriorityPlugin(AirflowPlugin): name = "DiscordPriorityPlugin" listeners = [sys.modules[__name__]] -except Exception: +except AirflowPriorityConfigurationOptionNotFound: _log.exception("Plugin could not be enabled") diff --git a/airflow_priority/plugins/newrelic.py b/airflow_priority/plugins/newrelic.py index 513e04c..f8e2bdb 100644 --- a/airflow_priority/plugins/newrelic.py +++ b/airflow_priority/plugins/newrelic.py @@ -7,7 +7,7 @@ from airflow.plugins_manager import AirflowPlugin from newrelic_telemetry_sdk import GaugeMetric, MetricClient -from airflow_priority import DagStatus, get_config_option, has_priority_tag +from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag __all__ = ( "send_metric_newrelic", @@ -55,11 +55,10 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): try: # Call once to ensure plugin will work - get_client() + get_config_option("newrelic", "api_key") class NewRelicPriorityPlugin(AirflowPlugin): name = "NewRelicPriorityPlugin" listeners = [sys.modules[__name__]] - -except Exception: +except AirflowPriorityConfigurationOptionNotFound: _log.exception("Plugin could not be enabled") diff --git a/airflow_priority/plugins/slack.py b/airflow_priority/plugins/slack.py index f732812..bde05a2 100644 --- a/airflow_priority/plugins/slack.py +++ b/airflow_priority/plugins/slack.py @@ -7,7 +7,7 @@ from airflow.plugins_manager import AirflowPlugin from slack_sdk import WebClient -from airflow_priority import DagStatus, get_config_option, has_priority_tag +from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag __all__ = ("get_client", "get_channel_id", "send_metric_slack", "on_dag_run_failed", "SlackPriorityPlugin") @@ -58,10 +58,11 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): try: # Call once to ensure plugin will work - get_client() + get_config_option("slack", "token") + get_config_option("slack", "channel") class SlackPriorityPlugin(AirflowPlugin): name = "SlackPriorityPlugin" listeners = [sys.modules[__name__]] -except Exception: +except AirflowPriorityConfigurationOptionNotFound: _log.exception("Plugin could not be enabled") diff --git a/airflow_priority/plugins/symphony.py b/airflow_priority/plugins/symphony.py index 37c0c43..71845f0 100644 --- a/airflow_priority/plugins/symphony.py +++ b/airflow_priority/plugins/symphony.py @@ -1,14 +1,14 @@ import ssl import sys from functools import lru_cache -from httpx import post from logging import getLogger from airflow.listeners import hookimpl from airflow.models.dagrun import DagRun from airflow.plugins_manager import AirflowPlugin +from httpx import post -from airflow_priority import DagStatus, get_config_option, has_priority_tag +from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag __all__ = ("get_config_options", "get_headers", "get_room_id", "send_metric_symphony", "on_dag_run_failed", "SymphonyPriorityPlugin") @@ -103,5 +103,5 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): class SymphonyPriorityPlugin(AirflowPlugin): name = "SymphonyPriorityPlugin" listeners = [sys.modules[__name__]] -except Exception: +except AirflowPriorityConfigurationOptionNotFound: _log.exception("Plugin could not be enabled") diff --git a/airflow_priority/tests/conftest.py b/airflow_priority/tests/conftest.py index 4e42cfb..a2d265b 100644 --- a/airflow_priority/tests/conftest.py +++ b/airflow_priority/tests/conftest.py @@ -38,7 +38,7 @@ def airflow_config(): airflow.configuration.load_standard_airflow_configuration(airflow_config_parser) airflow_config_parser.validate() airflow.configuration.conf = airflow_config_parser - yield + yield str(Path(td)) @pytest.fixture(scope="function", autouse=True) diff --git a/airflow_priority/tests/test_airflow_config.py b/airflow_priority/tests/test_airflow_config.py new file mode 100644 index 0000000..63b438a --- /dev/null +++ b/airflow_priority/tests/test_airflow_config.py @@ -0,0 +1,47 @@ +from pathlib import Path + +conf_text = """# @package _global_ +_target_: airflow_config.Configuration +default_args: + _target_: airflow_config.DefaultArgs + owner: test +extensions: + priority: + _target_: airflow_config.PriorityConfiguration + slack: + _target_: airflow_config.SlackConfiguration + token: abc + channel: def +""" + + +def test_config_loading_via_airflow_config(airflow_config, dag_run): + dags_path = Path(airflow_config) / "dags" + conf_path = dags_path / "config" + conf_path.mkdir(parents=True, exist_ok=True) + conf_file = conf_path / "config.yaml" + conf_file.write_text(conf_text) + + from airflow_config import Configuration + + config = Configuration.load("config", "config", basepath=str(dags_path), _offset=4) + ret = getattr(getattr(config.extensions.get("priority", None), "slack", None), "token", None) + assert ret == "abc" + + +def test_call_plugin_via_airflow_config(airflow_config, dag_run): + dags_path = Path(airflow_config) / "dags" + conf_path = dags_path / "config" + conf_path.mkdir(parents=True, exist_ok=True) + conf_file = conf_path / "config.yaml" + conf_file.write_text(conf_text) + from airflow_priority.plugins.slack import get_client + + client = get_client() + assert client is not None + assert client.token == "abc" + # with patch("airflow_priority.plugins.slack.send_metric_slack") as p2, \ + # patch("airflow_priority.plugins.slack.send_metric_slack") as p1: + # on_dag_run_failed(dag_run, "test") + + # assert p1.call_args == [] diff --git a/airflow_priority/tests/test_datadog.py b/airflow_priority/tests/test_datadog.py index 3b3aba5..2034a24 100644 --- a/airflow_priority/tests/test_datadog.py +++ b/airflow_priority/tests/test_datadog.py @@ -1,6 +1,10 @@ +import os from unittest.mock import patch +import pytest + +@pytest.mark.skipif(os.environ.get("DATADOG_API_KEY") is None, reason="Datadog key not set") def test_datadog_send(airflow_config, dag_run): from airflow_priority.plugins.datadog import send_metric_datadog diff --git a/airflow_priority/tests/test_discord.py b/airflow_priority/tests/test_discord.py index 6a11ee0..df6a64d 100644 --- a/airflow_priority/tests/test_discord.py +++ b/airflow_priority/tests/test_discord.py @@ -1,6 +1,10 @@ +import os from unittest.mock import patch +import pytest + +@pytest.mark.skipif(os.environ.get("DISCORD_TOKEN") is None, reason="Discord token not set") def test_discord_send(airflow_config, dag_run): from airflow_priority.plugins.discord import send_metric_discord diff --git a/airflow_priority/tests/test_newrelic.py b/airflow_priority/tests/test_newrelic.py index 782c69a..f2a9bb6 100644 --- a/airflow_priority/tests/test_newrelic.py +++ b/airflow_priority/tests/test_newrelic.py @@ -1,6 +1,10 @@ +import os from unittest.mock import patch +import pytest + +@pytest.mark.skipif(os.environ.get("NEWRELIC_API_KEY") is None, reason="New relic token not set") def test_newrelic_send(airflow_config, dag_run): from airflow_priority.plugins.newrelic import send_metric_newrelic diff --git a/airflow_priority/tests/test_slack.py b/airflow_priority/tests/test_slack.py index 4f48ce7..fca6099 100644 --- a/airflow_priority/tests/test_slack.py +++ b/airflow_priority/tests/test_slack.py @@ -1,9 +1,14 @@ +import os from unittest.mock import patch +import pytest + +@pytest.mark.skipif(os.environ.get("SLACK_TOKEN") is None, reason="Slack token token not set") def test_slack_send(airflow_config, dag_run): - from airflow_priority.plugins.slack import send_metric_slack + from airflow_priority.plugins.slack import get_client, send_metric_slack + get_client.cache_clear() send_metric_slack("UNIT TEST", 1, "BEEN TESTED") diff --git a/airflow_priority/tests/test_symphony.py b/airflow_priority/tests/test_symphony.py index 81ede0f..333a8b1 100644 --- a/airflow_priority/tests/test_symphony.py +++ b/airflow_priority/tests/test_symphony.py @@ -1,7 +1,8 @@ import os -import pytest from unittest.mock import patch +import pytest + @pytest.mark.skipif(os.environ.get("SYMPHONY_ROOM_NAME", "") == "", reason="no symphony credentials") def test_symphony_send(airflow_config, dag_run): diff --git a/pyproject.toml b/pyproject.toml index bd60da0..9bfc1fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,11 +51,16 @@ develop = [ "httpx", "newrelic-telemetry-sdk", "slack-sdk", + # Config + "airflow-config>=0.1.2", ] aws = [ "boto3", "botocore", ] +config = [ + "airflow-config>=0.1.2", +] datadog = [ "datadog-api-client", ]