diff --git a/airflow/dags/monitor/external_api.py b/airflow/dags/monitor/external_api.py new file mode 100644 index 00000000..f4459fa8 --- /dev/null +++ b/airflow/dags/monitor/external_api.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import requests + + +class APIMonitoring: + """ + A class to test API endpoints using GET or POST requests and ensure that they return HTTP 200 status codes. + """ + + def __init__(self, base_url: str) -> None: + self.base_url = base_url + if not self.base_url: + raise ValueError("ASK_ASTRO_API_BASE_URL cannot be empty.") + + def test_endpoint( + self, endpoint: str, method: str = "GET", data: dict | None = None, headers: dict | None = None + ) -> int: + """ + Test an endpoint with the specified method, data, and headers. + + :param endpoint: The endpoint to test. + :param method: The HTTP method to use. Defaults to 'GET'. + :param data: The data to send in the request. Defaults to None. + :param headers: The headers to send in the request. Defaults to None. + """ + url = f"{self.base_url}{endpoint}" + if method.upper() == "GET": + response = requests.get(url, headers=headers) + elif method.upper() == "POST": + response = requests.post(url, json=data, headers=headers) + else: + raise ValueError("Unsupported HTTP method.") + + response.raise_for_status() # Will raise an exception for non-200 responses + return response.status_code diff --git a/airflow/dags/monitor/monitor.py b/airflow/dags/monitor/monitor.py index 792c9609..7977c844 100644 --- a/airflow/dags/monitor/monitor.py +++ b/airflow/dags/monitor/monitor.py @@ -1,16 +1,21 @@ import json +import logging import os import tempfile from datetime import datetime import firebase_admin import requests +from monitor.external_api import APIMonitoring from weaviate_provider.hooks.weaviate import WeaviateHook from airflow.decorators import dag, task from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator from airflow.utils.trigger_rule import TriggerRule +# Set up the logger +logger = logging.getLogger("airflow.task") + monitoring_interval = os.environ.get("MONITORING_INTERVAL", "@daily") weaviate_conn_id = os.environ.get("WEAVIATE_CONN_ID", WeaviateHook.default_conn_name) @@ -22,6 +27,11 @@ google_service_account_json_value = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_VALUE", None) +ASK_ASTRO_API_BASE_URL = os.environ.get("ASK_ASTRO_API_BASE_URL", None) + +ASK_ASTRO_REQUEST_ID_1 = os.environ.get("ASK_ASTRO_REQUEST_ID_1", "05d8882e-56ac-11ee-a818-4200a9fe0102") +ASK_ASTRO_REQUEST_ID_2 = os.environ.get("ASK_ASTRO_REQUEST_ID_2", "f2d6524c-56ab-11ee-a818-4200a9fe0102") + @task(trigger_rule=TriggerRule.ALL_DONE) def slack_status(**context): @@ -71,7 +81,8 @@ def check_ui_status(): endpoint = "https://ask.astronomer.io" response = requests.get(endpoint) if response.status_code != 200: - raise response + raise Exception(f"UI check failed with status code: {response.status_code}") + logger.info(f"UI check passed with status code: {response.status_code}") @task(trigger_rule=TriggerRule.ALL_DONE) @@ -88,9 +99,9 @@ def check_weaviate_status(): count = metadata.get("count") break if count == 0: - print(f"Weavaite class {weaviate_class} is empty!") + logger.error(f"Weaviate class {weaviate_class} is empty!") else: - print(f"{count} record found in Weavaite class {weaviate_class}") + logger.info(f"{count} record found in Weaviate class {weaviate_class}") @task(trigger_rule=TriggerRule.ALL_DONE) @@ -105,14 +116,53 @@ def check_firestore_status(): firebase_admin.initialize_app() app = firebase_admin.get_app(name=firestore_app_name) - print(f"{app.name} found!") + logger.info(f"{app.name} found!") + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def monitor_apis(): + """ + Monitor a set of predefined API endpoints using the APIMonitoring class and report on their HTTP status codes. + This task will test each API endpoint defined in the endpoints list and will print the status code for each. + If any endpoint does not return a 200 status, an exception will be raised. + """ + headers = {"accept": "*/*", "Content-Type": "application/json"} + api_monitor = APIMonitoring(base_url=ASK_ASTRO_API_BASE_URL) + + slack_event_body = {} + request_body = {"prompt": "Example prompt"} + feedback_body = {"positive": True} + + endpoints_to_monitor = [ + ("/slack/events", "POST", headers, slack_event_body), + ("/slack/install", "GET", headers), + ("/slack/oauth_redirect", "GET", headers), + ("/requests", "POST", headers, request_body), + ("/requests", "GET", headers), + ] + + request_uuids = [ASK_ASTRO_REQUEST_ID_1, ASK_ASTRO_REQUEST_ID_2] + for request_id in request_uuids: + endpoints_to_monitor.append((f"/requests/{request_id}", "GET", headers)) + endpoints_to_monitor.append((f"/requests/{request_id}/feedback", "POST", headers, feedback_body)) + + # Monitor each API endpoint + for endpoint_info in endpoints_to_monitor: + try: + # Unpack the tuple with a default value for 'body' + endpoint, method, headers, body = (*endpoint_info, None)[:4] + status_code = api_monitor.test_endpoint(endpoint, method, headers, body) + logger.info(f"Endpoint {endpoint} returned status code {status_code}") + except requests.HTTPError as e: + logger.error(f"Endpoint {endpoint_info[0]} failed with status code: {e.response.status_code}") + raise Exception(f"Endpoint {endpoint_info[0]} failed with status code: {e.response.status_code}") @dag( schedule_interval=monitoring_interval, start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True ) def monitoring_dag(): - [check_ui_status(), check_weaviate_status(), check_firestore_status()] >> slack_status() + [check_ui_status(), check_weaviate_status(), check_firestore_status(), monitor_apis()] >> slack_status() monitoring_dag()