Skip to content

Commit

Permalink
Merge pull request #17 from airflow-laminar/tkp/imports
Browse files Browse the repository at this point in the history
defer dependency imports
  • Loading branch information
timkpaine authored Aug 12, 2024
2 parents 7c304bc + 122fd1e commit a6a4121
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 51 deletions.
15 changes: 9 additions & 6 deletions airflow_priority/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class AirflowPriorityConfigurationOptionNotFound(RuntimeError): ...

def get_config_option(section, key, required=True, default=None):
try:
try:
from airflow_config import ConfigNotFoundError, Configuration
from airflow_config import ConfigNotFoundError, Configuration

try:
config = Configuration.load("config", "config", basepath=str(Path(os.environ.get("AIRFLOW_HOME", "")) / "dags"), _offset=4)
ret = getattr(getattr(config.extensions.get("priority", None), section, None), key, None)
if ret is not None:
Expand All @@ -46,12 +46,15 @@ def get_config_option(section, key, required=True, default=None):
# SKIP
pass

import airflow.configuration
try:
import airflow.configuration

config_option = airflow.configuration.conf.get(f"priority.{section}", key, default)
if not config_option and required:
config_option = airflow.configuration.conf.get(f"priority.{section}", key, default)
if not config_option and required:
raise AirflowPriorityConfigurationOptionNotFound(f"{section}.{key}")
return config_option
except Exception:
raise AirflowPriorityConfigurationOptionNotFound(f"{section}.{key}")
return config_option


def has_priority_tag(dag_run: DagRun) -> Optional[Tuple[str, int]]:
Expand Down
21 changes: 12 additions & 9 deletions airflow_priority/plugins/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from boto3 import client as Boto3Client
from botocore.config import Config

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

_log = getLogger(__name__)


@lru_cache
def get_client():
config = Config(retries=dict(max_attempts=10, mode="adaptive"))
from boto3 import client as Boto3Client
from botocore.config import Config

config = Config(region_name=get_config_option("aws", "region"), retries=dict(max_attempts=10, mode="adaptive"))
return Boto3Client("cloudwatch", config=config)


Expand Down Expand Up @@ -67,13 +68,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_cloudwatch(dag_id, priority, "failed")


class AWSCloudWatchPriorityPlugin(AirflowPlugin):
name = "AWSCloudWatchPriorityPlugin"
listeners = []


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_client()

class AWSCloudWatchPriorityPlugin(AirflowPlugin):
name = "AWSCloudWatchPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
AWSCloudWatchPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")
29 changes: 17 additions & 12 deletions airflow_priority/plugins/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.metric_intake_type import MetricIntakeType
from datadog_api_client.v2.model.metric_payload import MetricPayload
from datadog_api_client.v2.model.metric_point import MetricPoint
from datadog_api_client.v2.model.metric_resource import MetricResource
from datadog_api_client.v2.model.metric_series import MetricSeries

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

Expand All @@ -30,12 +23,22 @@

@lru_cache
def get_configuration():
from datadog_api_client import Configuration

return Configuration(
api_key={"apiKeyAuth": get_config_option("datadog", "api_key")},
)


def send_metric_datadog(dag_id: str, priority: int, tag: DagStatus) -> None:
from datadog_api_client import ApiClient
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.metric_intake_type import MetricIntakeType
from datadog_api_client.v2.model.metric_payload import MetricPayload
from datadog_api_client.v2.model.metric_point import MetricPoint
from datadog_api_client.v2.model.metric_resource import MetricResource
from datadog_api_client.v2.model.metric_series import MetricSeries

with ApiClient(get_configuration()) as api_client:
api_instance = MetricsApi(api_client)

Expand Down Expand Up @@ -86,13 +89,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_datadog(dag_id, priority, "failed")


class DatadogPriorityPlugin(AirflowPlugin):
name = "DatadogPriorityPlugin"
listeners = [sys.modules[__name__]]


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_config_option("datadog", "api_key")

class DatadogPriorityPlugin(AirflowPlugin):
name = "DatadogPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
DatadogPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")
15 changes: 9 additions & 6 deletions airflow_priority/plugins/discord.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from discord import Client, Intents

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

Expand All @@ -21,6 +20,8 @@

@lru_cache
def get_client():
from discord import Client, Intents

client = Client(intents=Intents.default())
client.queue = Queue()

Expand Down Expand Up @@ -66,14 +67,16 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_discord(dag_id, priority, "failed")


class DiscordPriorityPlugin(AirflowPlugin):
name = "DiscordPriorityPlugin"
listeners = []


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_config_option("discord", "channel")
get_config_option("discord", "token")

class DiscordPriorityPlugin(AirflowPlugin):
name = "DiscordPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
DiscordPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")
17 changes: 11 additions & 6 deletions airflow_priority/plugins/newrelic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from newrelic_telemetry_sdk import GaugeMetric, MetricClient

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

Expand All @@ -23,10 +22,14 @@

@lru_cache
def get_client():
from newrelic_telemetry_sdk import MetricClient

return MetricClient(get_config_option("newrelic", "api_key"))


def send_metric_newrelic(dag_id: str, priority: int, tag: DagStatus) -> None:
from newrelic_telemetry_sdk import GaugeMetric

priority = GaugeMetric(
f"airflow.custom.priority.p{priority}.{tag}", 1, tags={"application": "airflow", "dag": dag_id, "priority": priority, "tag": tag}
)
Expand Down Expand Up @@ -54,13 +57,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_newrelic(dag_id, priority, "failed")


class NewRelicPriorityPlugin(AirflowPlugin):
name = "NewRelicPriorityPlugin"
listeners = []


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_config_option("newrelic", "api_key")

class NewRelicPriorityPlugin(AirflowPlugin):
name = "NewRelicPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
NewRelicPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")
15 changes: 9 additions & 6 deletions airflow_priority/plugins/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from slack_sdk import WebClient

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

Expand All @@ -18,6 +17,8 @@

@lru_cache
def get_client():
from slack_sdk import WebClient

return WebClient(token=get_config_option("slack", "token"))


Expand Down Expand Up @@ -57,14 +58,16 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_slack(dag_id, priority, "failed")


class SlackPriorityPlugin(AirflowPlugin):
name = "SlackPriorityPlugin"
listeners = []


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_config_option("slack", "token")
get_config_option("slack", "channel")

class SlackPriorityPlugin(AirflowPlugin):
name = "SlackPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
SlackPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")
19 changes: 13 additions & 6 deletions airflow_priority/plugins/symphony.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from httpx import post

from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

Expand All @@ -31,6 +30,8 @@ def get_config_options():


def _client_cert_post(url: str, cert_file: str, key_file: str) -> str:
from httpx import post

context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain(certfile=cert_file, keyfile=key_file)
response = post(url=url, verify=context, headers={"Content-Type": "application/json"}, data="{}")
Expand All @@ -53,6 +54,8 @@ def get_headers():

@lru_cache
def get_room_id():
from httpx import post

config_options = get_config_options()

res = post(
Expand All @@ -69,6 +72,8 @@ def get_room_id():


def send_metric_symphony(dag_id: str, priority: int, tag: DagStatus) -> None:
from httpx import post

return post(
url=get_config_options()["message_create_url"].replace("SID", get_room_id()),
json={"message": f'<messageML>A P{priority} DAG "{dag_id}" has {tag}!</messageML>'},
Expand Down Expand Up @@ -97,13 +102,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
send_metric_symphony(dag_id, priority, "failed")


class SymphonyPriorityPlugin(AirflowPlugin):
name = "SymphonyPriorityPlugin"
listeners = []


try:
if os.environ.get("SPHINX_BUILDING", "0") != "1":
# Call once to ensure plugin will work
get_config_options()

class SymphonyPriorityPlugin(AirflowPlugin):
name = "SymphonyPriorityPlugin"
listeners = [sys.modules[__name__]]
except AirflowPriorityConfigurationOptionNotFound:
SymphonyPriorityPlugin.listeners.append(sys.modules[__name__])
except (ImportError, AirflowPriorityConfigurationOptionNotFound):
_log.exception("Plugin could not be enabled")

0 comments on commit a6a4121

Please sign in to comment.