From 05c3aa51e76bd96f43f85bab753424035df8e5c7 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 14 Nov 2023 01:23:17 +0530 Subject: [PATCH 1/4] Add dag to check ingestion dag health --- .../dags/monitor/monitor_ingestion_dags.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 airflow/dags/monitor/monitor_ingestion_dags.py diff --git a/airflow/dags/monitor/monitor_ingestion_dags.py b/airflow/dags/monitor/monitor_ingestion_dags.py new file mode 100644 index 00000000..191975a9 --- /dev/null +++ b/airflow/dags/monitor/monitor_ingestion_dags.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import logging +import os +from datetime import datetime +from typing import Any + +from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator +from airflow.models import DagBag +from airflow.utils.cli import get_dag, get_dags, process_subdir +from airflow.decorators import dag, task + +logger = logging.getLogger("airflow.task") + +slack_webhook_conn = os.environ.get("SLACK_WEBHOOK_CONN", "slack_webhook_default") + + +ingestion_dags = ["ask_astro_load_bulk", "ask_astro_load_blogs", "ask_astro_load_github", "ask_astro_load_registry", "ask_astro_load_slack", "ask_astro_load_stackoverflow"] + + +@task +def check_ingestion_dags(**context: Any): + airflow_home = os.environ.get("AIRFLOW_HOME") + dagbag = DagBag(process_subdir(f"{airflow_home}/dags")) + data = [] + for filename, errors in dagbag.import_errors.items(): + data.append({"filepath": filename, "error": errors}) + + if data: + logger.info("************DAG Import Error*************") + logger.error(data) + logger.info("******************************") + message = f":red_circle: Import Error in DAG" + + ingestion_dag_exist = False + if set(dagbag.dag_ids).issubset(set(ingestion_dags)): + ingestion_dag_exist = True + message = f":red_circle: Some Ingestion DAG's are missing" + + if not ingestion_dag_exist or data: + print("hello hello") + SlackWebhookOperator( + task_id="slack_alert", + slack_webhook_conn_id=slack_webhook_conn, + message=message, + ).execute(context=context) + + +@dag( + schedule_interval="@daily", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True +) +def monitor_ingestion_dags(): + check_ingestion_dags() + + +monitor_ingestion_dags() From 84da6fb6f043aaece6b9b4ef6af503cdee0089bf Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 14 Nov 2023 01:25:55 +0530 Subject: [PATCH 2/4] Add dag to check ingestion dag health --- airflow/dags/monitor/monitor_ingestion_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/monitor/monitor_ingestion_dags.py b/airflow/dags/monitor/monitor_ingestion_dags.py index 191975a9..db8bf898 100644 --- a/airflow/dags/monitor/monitor_ingestion_dags.py +++ b/airflow/dags/monitor/monitor_ingestion_dags.py @@ -33,7 +33,7 @@ def check_ingestion_dags(**context: Any): message = f":red_circle: Import Error in DAG" ingestion_dag_exist = False - if set(dagbag.dag_ids).issubset(set(ingestion_dags)): + if set(ingestion_dags).issubset(set(dagbag.dag_ids)): ingestion_dag_exist = True message = f":red_circle: Some Ingestion DAG's are missing" From ef90bd06e2b1acb7d56f414cda08aace13156fa6 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 14 Nov 2023 01:27:18 +0530 Subject: [PATCH 3/4] Add dag to check ingestion dag health --- .../dags/monitor/monitor_ingestion_dags.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/airflow/dags/monitor/monitor_ingestion_dags.py b/airflow/dags/monitor/monitor_ingestion_dags.py index db8bf898..f19f03e5 100644 --- a/airflow/dags/monitor/monitor_ingestion_dags.py +++ b/airflow/dags/monitor/monitor_ingestion_dags.py @@ -5,17 +5,24 @@ from datetime import datetime from typing import Any -from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator -from airflow.models import DagBag -from airflow.utils.cli import get_dag, get_dags, process_subdir from airflow.decorators import dag, task +from airflow.models import DagBag +from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator +from airflow.utils.cli import process_subdir logger = logging.getLogger("airflow.task") slack_webhook_conn = os.environ.get("SLACK_WEBHOOK_CONN", "slack_webhook_default") -ingestion_dags = ["ask_astro_load_bulk", "ask_astro_load_blogs", "ask_astro_load_github", "ask_astro_load_registry", "ask_astro_load_slack", "ask_astro_load_stackoverflow"] +ingestion_dags = [ + "ask_astro_load_bulk", + "ask_astro_load_blogs", + "ask_astro_load_github", + "ask_astro_load_registry", + "ask_astro_load_slack", + "ask_astro_load_stackoverflow", +] @task @@ -30,12 +37,12 @@ def check_ingestion_dags(**context: Any): logger.info("************DAG Import Error*************") logger.error(data) logger.info("******************************") - message = f":red_circle: Import Error in DAG" + message = ":red_circle: Import Error in DAG" ingestion_dag_exist = False if set(ingestion_dags).issubset(set(dagbag.dag_ids)): ingestion_dag_exist = True - message = f":red_circle: Some Ingestion DAG's are missing" + message = ":red_circle: Some Ingestion DAG's are missing" if not ingestion_dag_exist or data: print("hello hello") @@ -46,9 +53,7 @@ def check_ingestion_dags(**context: Any): ).execute(context=context) -@dag( - schedule_interval="@daily", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True -) +@dag(schedule_interval="@daily", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) def monitor_ingestion_dags(): check_ingestion_dags() From bc49069c9e647efb9e70dada7ab2e374c94f394a Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Tue, 14 Nov 2023 12:53:12 +0530 Subject: [PATCH 4/4] Update airflow/dags/monitor/monitor_ingestion_dags.py Co-authored-by: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> --- airflow/dags/monitor/monitor_ingestion_dags.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/dags/monitor/monitor_ingestion_dags.py b/airflow/dags/monitor/monitor_ingestion_dags.py index f19f03e5..58efb6ac 100644 --- a/airflow/dags/monitor/monitor_ingestion_dags.py +++ b/airflow/dags/monitor/monitor_ingestion_dags.py @@ -45,7 +45,6 @@ def check_ingestion_dags(**context: Any): message = ":red_circle: Some Ingestion DAG's are missing" if not ingestion_dag_exist or data: - print("hello hello") SlackWebhookOperator( task_id="slack_alert", slack_webhook_conn_id=slack_webhook_conn,