Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate with airflow-config #11

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow_priority/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from pathlib import Path
from types import MappingProxyType
from typing import Literal, Optional, Tuple

Expand Down Expand Up @@ -29,6 +31,21 @@


def get_config_option(section, key, required=True, default=None):
try:
try:
from airflow_config import ConfigNotFoundError, Configuration

config = Configuration.load("config", "config", basepath=str(Path(os.environ.get("AIRFLOW_HOME", "")) / "dags"), _offset=4)
ret = getattr(getattr(config.extensions.get("priority", None), section, None), key, None)
if ret is not None:
return ret
except ConfigNotFoundError:
# SKIP
pass
except ImportError:

Check warning on line 45 in airflow_priority/common.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/common.py#L45

Added line #L45 was not covered by tests
# SKIP
pass

Check warning on line 47 in airflow_priority/common.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/common.py#L47

Added line #L47 was not covered by tests

import airflow.configuration

config_option = airflow.configuration.conf.get(f"priority.{section}", key, default)
Expand Down
6 changes: 3 additions & 3 deletions airflow_priority/plugins/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from datadog_api_client.v2.model.metric_resource import MetricResource
from datadog_api_client.v2.model.metric_series import MetricSeries

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = (
"send_metric_datadog",
Expand Down Expand Up @@ -87,10 +87,10 @@

try:
# Call once to ensure plugin will work
get_configuration()
get_config_option("datadog", "api_key")

class DatadogPriorityPlugin(AirflowPlugin):
name = "DatadogPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:

Check warning on line 95 in airflow_priority/plugins/datadog.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/plugins/datadog.py#L95

Added line #L95 was not covered by tests
_log.exception("Plugin could not be enabled")
7 changes: 4 additions & 3 deletions airflow_priority/plugins/discord.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airflow.plugins_manager import AirflowPlugin
from discord import Client, Intents

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_client", "send_metric_discord", "on_dag_run_failed", "DiscordPriorityPlugin")

Expand Down Expand Up @@ -67,10 +67,11 @@

try:
# Call once to ensure plugin will work
get_client()
get_config_option("discord", "channel")
get_config_option("discord", "token")

class DiscordPriorityPlugin(AirflowPlugin):
name = "DiscordPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:

Check warning on line 76 in airflow_priority/plugins/discord.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/plugins/discord.py#L76

Added line #L76 was not covered by tests
_log.exception("Plugin could not be enabled")
7 changes: 3 additions & 4 deletions airflow_priority/plugins/newrelic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.plugins_manager import AirflowPlugin
from newrelic_telemetry_sdk import GaugeMetric, MetricClient

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = (
"send_metric_newrelic",
Expand Down Expand Up @@ -55,11 +55,10 @@

try:
# Call once to ensure plugin will work
get_client()
get_config_option("newrelic", "api_key")

class NewRelicPriorityPlugin(AirflowPlugin):
name = "NewRelicPriorityPlugin"
listeners = [sys.modules[__name__]]

except Exception:
except AirflowPriorityConfigurationOptionNotFound:

Check warning on line 63 in airflow_priority/plugins/newrelic.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/plugins/newrelic.py#L63

Added line #L63 was not covered by tests
_log.exception("Plugin could not be enabled")
7 changes: 4 additions & 3 deletions airflow_priority/plugins/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.plugins_manager import AirflowPlugin
from slack_sdk import WebClient

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_client", "get_channel_id", "send_metric_slack", "on_dag_run_failed", "SlackPriorityPlugin")

Expand Down Expand Up @@ -58,10 +58,11 @@

try:
# Call once to ensure plugin will work
get_client()
get_config_option("slack", "token")
get_config_option("slack", "channel")

class SlackPriorityPlugin(AirflowPlugin):
name = "SlackPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:

Check warning on line 67 in airflow_priority/plugins/slack.py

View check run for this annotation

Codecov / codecov/patch

airflow_priority/plugins/slack.py#L67

Added line #L67 was not covered by tests
_log.exception("Plugin could not be enabled")
6 changes: 3 additions & 3 deletions airflow_priority/plugins/symphony.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import ssl
import sys
from functools import lru_cache
from httpx import post
from logging import getLogger

from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from httpx import post

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_config_options", "get_headers", "get_room_id", "send_metric_symphony", "on_dag_run_failed", "SymphonyPriorityPlugin")

Expand Down Expand Up @@ -103,5 +103,5 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
class SymphonyPriorityPlugin(AirflowPlugin):
name = "SymphonyPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
2 changes: 1 addition & 1 deletion airflow_priority/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def airflow_config():
airflow.configuration.load_standard_airflow_configuration(airflow_config_parser)
airflow_config_parser.validate()
airflow.configuration.conf = airflow_config_parser
yield
yield str(Path(td))


@pytest.fixture(scope="function", autouse=True)
Expand Down
47 changes: 47 additions & 0 deletions airflow_priority/tests/test_airflow_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path

conf_text = """# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
owner: test
extensions:
priority:
_target_: airflow_config.PriorityConfiguration
slack:
_target_: airflow_config.SlackConfiguration
token: abc
channel: def
"""


def test_config_loading_via_airflow_config(airflow_config, dag_run):
dags_path = Path(airflow_config) / "dags"
conf_path = dags_path / "config"
conf_path.mkdir(parents=True, exist_ok=True)
conf_file = conf_path / "config.yaml"
conf_file.write_text(conf_text)

from airflow_config import Configuration

config = Configuration.load("config", "config", basepath=str(dags_path), _offset=4)
ret = getattr(getattr(config.extensions.get("priority", None), "slack", None), "token", None)
assert ret == "abc"


def test_call_plugin_via_airflow_config(airflow_config, dag_run):
dags_path = Path(airflow_config) / "dags"
conf_path = dags_path / "config"
conf_path.mkdir(parents=True, exist_ok=True)
conf_file = conf_path / "config.yaml"
conf_file.write_text(conf_text)
from airflow_priority.plugins.slack import get_client

client = get_client()
assert client is not None
assert client.token == "abc"
# with patch("airflow_priority.plugins.slack.send_metric_slack") as p2, \
# patch("airflow_priority.plugins.slack.send_metric_slack") as p1:
# on_dag_run_failed(dag_run, "test")

# assert p1.call_args == []
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_datadog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("DATADOG_API_KEY") is None, reason="Datadog key not set")
def test_datadog_send(airflow_config, dag_run):
from airflow_priority.plugins.datadog import send_metric_datadog

Expand Down
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_discord.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("DISCORD_TOKEN") is None, reason="Discord token not set")
def test_discord_send(airflow_config, dag_run):
from airflow_priority.plugins.discord import send_metric_discord

Expand Down
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_newrelic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("NEWRELIC_API_KEY") is None, reason="New relic token not set")
def test_newrelic_send(airflow_config, dag_run):
from airflow_priority.plugins.newrelic import send_metric_newrelic

Expand Down
7 changes: 6 additions & 1 deletion airflow_priority/tests/test_slack.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("SLACK_TOKEN") is None, reason="Slack token token not set")
def test_slack_send(airflow_config, dag_run):
from airflow_priority.plugins.slack import send_metric_slack
from airflow_priority.plugins.slack import get_client, send_metric_slack

get_client.cache_clear()
send_metric_slack("UNIT TEST", 1, "BEEN TESTED")


Expand Down
3 changes: 2 additions & 1 deletion airflow_priority/tests/test_symphony.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import pytest
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("SYMPHONY_ROOM_NAME", "") == "", reason="no symphony credentials")
def test_symphony_send(airflow_config, dag_run):
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ develop = [
"httpx",
"newrelic-telemetry-sdk",
"slack-sdk",
# Config
"airflow-config>=0.1.2",
]
aws = [
"boto3",
"botocore",
]
config = [
"airflow-config>=0.1.2",
]
datadog = [
"datadog-api-client",
]
Expand Down