Skip to content

Commit

Permalink
Merge pull request #4 from airflow-laminar/tkp/symphony
Browse files Browse the repository at this point in the history
Add discord and symphony alerts
  • Loading branch information
timkpaine authored Aug 4, 2024
2 parents 7971571 + eed8a8b commit 05a72e1
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ jobs:
run: make coverage
env:
DATADOG_API_KEY: ${{ secrets.DATADOG_API_KEY }}
DISCORD_TOKEN: ${{ secrets.DISCORD_TOKEN }}
DISCORD_CHANNEL: ${{ secrets.DISCORD_CHANNEL }}
NEWRELIC_API_KEY: ${{ secrets.NEWRELIC_API_KEY }}
SLACK_CHANNEL: ${{ secrets.SLACK_CHANNEL }}
SLACK_TOKEN: ${{ secrets.SLACK_TOKEN }}
Expand Down
60 changes: 49 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ This repo provides [Airflow Plugins](https://airflow.apache.org/docs/apache-airf

- [New Relic](https://newrelic.com)
- [Datadog](https://www.datadoghq.com)
- [Discord](http://discord.com)
- [Slack](http://slack.com)
- [Symphony](http://symphony.com)

Where `P1` corresponds to highest priority, and `P5` corresponds to lowest.

Expand All @@ -32,12 +34,41 @@ conda install airflow-priority -c conda-forge
```

## Integrations
| Integration | Metric / Tag |
| :---------- | :----------- |
| [New Relic](https://newrelic.com) | `airflow.custom.priority.p{1,2,3,4,5}.{failed,succeeded,running}` |

| Integration | Metric / Tag |
| :----------------------------------- | :---------------------------------------------------------------- |
| [New Relic](https://newrelic.com) | `airflow.custom.priority.p{1,2,3,4,5}.{failed,succeeded,running}` |
| [Datadog](https://www.datadoghq.com) | `airflow.custom.priority.p{1,2,3,4,5}.{failed,succeeded,running}` |
| [Slack](http://slack.com) | `N/A` |
| [Discord](http://discord.com) | `N/A` |
| [Slack](http://slack.com) | `N/A` |
| [Symphony](http://symphony.com) | `N/A` |

### Datadog

Create a new Datadog api key [following their guide](https://docs.datadoghq.com/account_management/api-app-keys/#add-an-api-key-or-client-token).

Copy this api key into your `airflow.cfg` like so:

```
[priority.datadog]
api_key = the api key
```

Ensure your dags are configured with tags and run some, it can often be convenient to have an intentionally failing `P1` dag to test the integration. With this, you can now [create custom monitors](https://docs.datadoghq.com/getting_started/monitors/) for the tags.

### Discord

Create a new Discord application following the [guide from the discord.py library](https://discordpy.readthedocs.io/en/stable/discord.html).

Copy your bot's token into your `airflow.cfg` like so:

```
[priority.discord]
token = the bot's token
channel = the numerical channel ID, from the url or by right clicking
```

Ensure your bot is invited into any private channels.

### New Relic

Expand Down Expand Up @@ -91,18 +122,25 @@ token = xoxb-...
channel = channel-name
```

### Datadog
### Symphony

Create a new Datadog api key [following their guide](https://docs.datadoghq.com/account_management/api-app-keys/#add-an-api-key-or-client-token).
Documentation coming soon!

Copy this api key into your `airflow.cfg` like so:
- [Overview of REST API](https://docs.developers.symphony.com/bots/overview-of-rest-api)
- [Certificate Authentication Workflow](https://docs.developers.symphony.com/bots/authentication/certificate-authentication)

```
[priority.datadog]
api_key = the api key
[priority.symphony]
room_name = the room name
message_create_url = https://mycompany.symphony.com/agent/v4/stream/SID/message/create
cert_file = path/to/my/cert.pem
key_file = path/to/my/key.pem
session_auth = https://mycompany-api.symphony.com/sessionauth/v1/authenticate
key_auth = https://mycompany-api.symphony.com/keyauth/v1/authenticate
room_search_url = https://mycompany.symphony.com/pod/v3/room/search
```

Ensure your dags are configured with tags and run some, it can often be convenient to have an intentionally failing `P1` dag to test the integration. With this, you can now [create custom monitors](https://docs.datadoghq.com/getting_started/monitors/) for the tags.


## License

This software is licensed under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.
2 changes: 1 addition & 1 deletion airflow_priority/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class AirflowPriorityConfigurationOptionNotFound(RuntimeError): ...


def get_config_option(section, key, default, required=True):
def get_config_option(section, key, required=True, default=None):
import airflow.configuration

config_option = airflow.configuration.conf.get(f"priority.{section}", key, default)
Expand Down
2 changes: 1 addition & 1 deletion airflow_priority/plugins/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@lru_cache
def get_configuration():
return Configuration(
api_key={"apiKeyAuth": get_config_option("datadog", "api_key", "")},
api_key={"apiKeyAuth": get_config_option("datadog", "api_key")},
)


Expand Down
76 changes: 76 additions & 0 deletions airflow_priority/plugins/discord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import sys
from asyncio import sleep
from functools import lru_cache
from logging import getLogger
from queue import Queue
from threading import Thread
from time import sleep as time_sleep

from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from discord import Client, Intents

from airflow_priority import DagStatus, get_config_option, has_priority_tag

__all__ = ("get_client", "send_metric_discord", "on_dag_run_failed", "DiscordPriorityPlugin")

_log = getLogger(__name__)


@lru_cache
def get_client():
client = Client(intents=Intents.default())
client.queue = Queue()

@client.event
async def on_ready():
channel = client.get_channel(int(get_config_option("discord", "channel")))
while True:
while client.queue.empty():
await sleep(5)
await channel.send(client.queue.get())

token = get_config_option("discord", "token")
t = Thread(target=client.run, args=(token,), daemon=True)
t.start()
return client


def send_metric_discord(dag_id: str, priority: int, tag: DagStatus) -> None:
client_queue = get_client().queue
client_queue.put(f'A P{priority} DAG "{dag_id}" has {tag}!')
while not client_queue.empty():
time_sleep(1)


# @hookimpl
# def on_dag_run_running(dag_run: DagRun, msg: str):
# dag_id, priority = has_priority_tag(dag_run=dag_run)
# if priority:
# send_metric_slack(dag_id, priority, "running")


# @hookimpl
# def on_dag_run_success(dag_run: DagRun, msg: str):
# dag_id, priority = has_priority_tag(dag_run=dag_run)
# if priority:
# send_metric_slack(dag_id, priority, "succeeded")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
dag_id, priority = has_priority_tag(dag_run=dag_run)
if priority:
send_metric_discord(dag_id, priority, "failed")


try:
# Call once to ensure plugin will work
get_client()

class DiscordPriorityPlugin(AirflowPlugin):
name = "DiscordPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
_log.exception("Plugin could not be enabled")
2 changes: 1 addition & 1 deletion airflow_priority/plugins/newrelic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

@lru_cache
def get_client():
return MetricClient(get_config_option("newrelic", "api_key", "", True))
return MetricClient(get_config_option("newrelic", "api_key"))


def send_metric_newrelic(dag_id: str, priority: int, tag: DagStatus) -> None:
Expand Down
4 changes: 2 additions & 2 deletions airflow_priority/plugins/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

@lru_cache
def get_client():
return WebClient(token=get_config_option("slack", "token", ""))
return WebClient(token=get_config_option("slack", "token"))


@lru_cache
def get_channel_id():
channel_name = get_config_option("slack", "channel", "")
channel_name = get_config_option("slack", "channel")
conversations = get_client().conversations_list(types=["public_channel", "private_channel"])
if conversations.data["ok"]:
for channel in conversations.data["channels"]:
Expand Down
107 changes: 107 additions & 0 deletions airflow_priority/plugins/symphony.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import ssl
import sys
from functools import lru_cache
from httpx import post
from logging import getLogger

from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin

from airflow_priority import DagStatus, get_config_option, has_priority_tag

__all__ = ("get_config_options", "get_headers", "get_room_id", "send_metric_symphony", "on_dag_run_failed", "SymphonyPriorityPlugin")


_log = getLogger(__name__)


@lru_cache
def get_config_options():
return {
"room_name": get_config_option("symphony", "room_name"),
"message_create_url": get_config_option("symphony", "message_create_url"),
"cert_file": get_config_option("symphony", "cert_file"),
"key_file": get_config_option("symphony", "key_file"),
"session_auth": get_config_option("symphony", "session_auth"),
"key_auth": get_config_option("symphony", "key_auth"),
"room_search_url": get_config_option("symphony", "room_search_url"),
}


def _client_cert_post(url: str, cert_file: str, key_file: str) -> str:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain(certfile=cert_file, keyfile=key_file)
response = post(url=url, verify=context, headers={"Content-Type": "application/json"}, data="{}")
if response.status_code != 200:
raise Exception(f"Cannot connect for symphony handshake to {url}: {response.status_code}")
return response.json()


@lru_cache
def get_headers():
config_options = get_config_options()
session_token = _client_cert_post(config_options["session_auth"], config_options["cert_file"], config_options["key_file"])["token"]
key_manager_token = _client_cert_post(config_options["key_auth"], config_options["cert_file"], config_options["key_file"])["token"]
return {
"sessionToken": session_token,
"keyManagerToken": key_manager_token,
"Accept": "application/json",
}


@lru_cache
def get_room_id():
config_options = get_config_options()

res = post(
url=config_options["room_search_url"],
json={"query": config_options["room_name"]},
headers=get_headers(),
)
if res and res.status_code == 200:
for room in res.json()["rooms"]:
name = room.get("roomAttributes", {}).get("name")
if name and name == config_options["room_name"]:
return room.get("roomSystemInfo", {}).get("id")
raise Exception("TODO")


def send_metric_symphony(dag_id: str, priority: int, tag: DagStatus) -> None:
return post(
url=get_config_options()["message_create_url"].replace("SID", get_room_id()),
json={"message": f'<messageML>A P{priority} DAG "{dag_id}" has {tag}!</messageML>'},
headers=get_headers(),
)


# @hookimpl
# def on_dag_run_running(dag_run: DagRun, msg: str):
# dag_id, priority = has_priority_tag(dag_run=dag_run)
# if priority:
# send_metric_slack(dag_id, priority, "running")


# @hookimpl
# def on_dag_run_success(dag_run: DagRun, msg: str):
# dag_id, priority = has_priority_tag(dag_run=dag_run)
# if priority:
# send_metric_slack(dag_id, priority, "succeeded")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
dag_id, priority = has_priority_tag(dag_run=dag_run)
if priority:
send_metric_symphony(dag_id, priority, "failed")


try:
# Call once to ensure plugin will work
get_config_options()

class SymphonyPriorityPlugin(AirflowPlugin):
name = "SymphonyPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
_log.exception("Plugin could not be enabled")
22 changes: 18 additions & 4 deletions airflow_priority/tests/airflow.cfg.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,26 @@ test = "blerg"
test="blerg"
{% endraw %}

[priority.slack]
token = {{ SLACK_TOKEN }}
channel = {{ SLACK_CHANNEL }}

[priority.datadog]
api_key = {{ DATADOG_API_KEY }}

[priority.discord]
token = {{ DISCORD_TOKEN }}
channel = {{ DISCORD_CHANNEL }}

[priority.newrelic]
api_key = {{ NEWRELIC_API_KEY }}

[priority.slack]
token = {{ SLACK_TOKEN }}
channel = {{ SLACK_CHANNEL }}

[priority.symphony]
room_name = {{ SYMPHONY_ROOM_NAME }}
message_create_url = {{ SYMPHONY_MESSAGE_CREATE_URL }}
cert_file = {{ SYMPHONY_CERT_FILE }}
key_file = {{ SYMPHONY_KEY_FILE }}
session_auth = {{ SYMPHONY_SESSION_AUTH }}
key_auth = {{ SYMPHONY_KEY_AUTH }}
room_search_url = {{ SYMPHONY_ROOM_SEARCH_URL }}

13 changes: 11 additions & 2 deletions airflow_priority/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@ def airflow_config():
j2 = Environment(loader=DictLoader({"airflow.cfg": config_template}), trim_blocks=True)
with TemporaryDirectory() as td:
tmpl = j2.get_template("airflow.cfg").render(
SLACK_TOKEN=os.environ.get("SLACK_TOKEN", ""),
SLACK_CHANNEL=os.environ.get("SLACK_CHANNEL", ""),
DATADOG_HOST=os.environ.get("DATADOG_HOST", ""),
DATADOG_API_KEY=os.environ.get("DATADOG_API_KEY", ""),
DISCORD_TOKEN=os.environ.get("DISCORD_TOKEN", ""),
DISCORD_CHANNEL=os.environ.get("DISCORD_CHANNEL", ""),
NEWRELIC_API_KEY=os.environ.get("NEWRELIC_API_KEY", ""),
SLACK_TOKEN=os.environ.get("SLACK_TOKEN", ""),
SLACK_CHANNEL=os.environ.get("SLACK_CHANNEL", ""),
SYMPHONY_ROOM_NAME=os.environ.get("SYMPHONY_ROOM_NAME", ""),
SYMPHONY_MESSAGE_CREATE_URL=os.environ.get("SYMPHONY_MESSAGE_CREATE_URL", ""),
SYMPHONY_CERT_FILE=os.environ.get("SYMPHONY_CERT_FILE", ""),
SYMPHONY_KEY_FILE=os.environ.get("SYMPHONY_KEY_FILE", ""),
SYMPHONY_SESSION_AUTH=os.environ.get("SYMPHONY_SESSION_AUTH", ""),
SYMPHONY_KEY_AUTH=os.environ.get("SYMPHONY_KEY_AUTH", ""),
SYMPHONY_ROOM_SEARCH_URL=os.environ.get("SYMPHONY_ROOM_SEARCH_URL", ""),
)
(Path(td) / "airflow.cfg").write_text(tmpl)
os.environ["AIRFLOW_HOME"] = str(Path(td))
Expand Down
15 changes: 15 additions & 0 deletions airflow_priority/tests/test_discord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from unittest.mock import patch


def test_discord_send(airflow_config, dag_run):
from airflow_priority.plugins.discord import send_metric_discord

send_metric_discord("UNIT TEST", 1, "BEEN TESTED")


def test_discord_priority_failed(airflow_config, dag_run):
from airflow_priority.plugins.discord import on_dag_run_failed

with patch("airflow_priority.plugins.discord.send_metric_discord") as p1:
on_dag_run_failed(dag_run, "test")
assert p1.call_count == 1
Loading

0 comments on commit 05a72e1

Please sign in to comment.