From 7088a26054f96407c60595f1287265dbd01b5c88 Mon Sep 17 00:00:00 2001 From: ayazabbas <30928485+ayazabbas@users.noreply.github.com> Date: Fri, 17 May 2024 12:23:13 +0100 Subject: [PATCH] Add zenduty integration (#67) * Add zenduty integration * comment * comment * bump minor version * address comments * mention new event type in readme * fix exponent operator --- Makefile | 7 ++++-- README.md | 4 +++ pyproject.toml | 2 +- pyth_observer/dispatch.py | 50 +++++++++++++++++++++++++++++++++++++ pyth_observer/event.py | 27 +++++++++++++++++++- pyth_observer/zenduty.py | 52 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 138 insertions(+), 4 deletions(-) create mode 100644 pyth_observer/zenduty.py diff --git a/Makefile b/Makefile index 2262259..2373a08 100644 --- a/Makefile +++ b/Makefile @@ -28,8 +28,11 @@ cover: ve lint: lint.python #lint: lint.yaml - argh, RHEL is too old to do this by default -lint.python: ve - . ve/bin/activate; flake8 observer.py pyth_observer/ +lint.python: + poetry run isort pyth_observer/ + poetry run black pyth_observer/ + poetry run pyright pyth_observer/ + poetry run pyflakes pyth_observer/ lint.yaml: yamllint . diff --git a/README.md b/README.md index b59d761..c60b278 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,10 @@ Event types are configured via environment variables: - `TelegramEvent` - `TELEGRAM_BOT_TOKEN` - API token for the Telegram bot +- `ZendutyEvent` + - `ZENDUTY_INTEGRATION_KEY` - Integration key for Zenduty service API integration + - `OPEN_ALERTS_FILE` - Path to local file used for persisting open alerts + ## Finding the Telegram Group Chat ID To integrate Telegram events with the Observer, you need the Telegram group chat ID. Here's how you can find it: diff --git a/pyproject.toml b/pyproject.toml index 4b23e72..c988e1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ignore_missing_imports = true [tool.poetry] name = "pyth-observer" -version = "0.2.5" +version = "0.2.6" description = "Alerts and stuff" authors = [] readme = "README.md" diff --git a/pyth_observer/dispatch.py b/pyth_observer/dispatch.py index afae8e8..8e7926a 100644 --- a/pyth_observer/dispatch.py +++ b/pyth_observer/dispatch.py @@ -1,7 +1,11 @@ import asyncio +import json +import os from copy import deepcopy +from datetime import datetime, timedelta from typing import Any, Awaitable, Dict, List +from loguru import logger from prometheus_client import Gauge from pyth_observer.check import Check, State @@ -10,11 +14,14 @@ from pyth_observer.event import DatadogEvent # Used dynamically from pyth_observer.event import LogEvent # Used dynamically from pyth_observer.event import TelegramEvent # Used dynamically +from pyth_observer.event import ZendutyEvent # Used dynamically from pyth_observer.event import Event +from pyth_observer.zenduty import send_zenduty_alert assert DatadogEvent assert LogEvent assert TelegramEvent +assert ZendutyEvent class Dispatch: @@ -36,6 +43,16 @@ def __init__(self, config, publishers): "Publisher check failure status", ["check", "symbol", "publisher"], ) + if "ZendutyEvent" in self.config["events"]: + self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"] + self.open_alerts = self.load_alerts() + + def load_alerts(self): + try: + with open(self.open_alerts_file, "r") as file: + return json.load(file) + except FileNotFoundError: + return {} # Return an empty dict if the file doesn't exist async def run(self, states: List[State]): # First, run each check and store the ones that failed @@ -62,8 +79,41 @@ async def run(self, states: List[State]): sent_events.append(event.send()) + if event_type == "ZendutyEvent": + # Add failed check to open alerts + alert_identifier = ( + f"{check.__class__.__name__}-{check.state().symbol}" + ) + state = check.state() + if isinstance(state, PublisherState): + alert_identifier += f"-{state.publisher_name}" + self.open_alerts[alert_identifier] = datetime.now().isoformat() + await asyncio.gather(*sent_events) + # Check open alerts and resolve those that are older than 2 minutes + if "ZendutyEvent" in self.config["events"]: + + to_remove = [] + current_time = datetime.now() + for identifier, last_failure in self.open_alerts.items(): + if current_time - datetime.fromisoformat(last_failure) >= timedelta( + minutes=2 + ): + logger.debug(f"Resolving Zenduty alert {identifier}") + response = await send_zenduty_alert( + alert_identifier=identifier, message=identifier, resolved=True + ) + if response and 200 <= response.status < 300: + to_remove.append(identifier) + + for identifier in to_remove: + del self.open_alerts[identifier] + + # Write open alerts to file to ensure persistence + with open(self.open_alerts_file, "w") as file: + json.dump(self.open_alerts, file) + def check_price_feed(self, state: PriceFeedState) -> List[Check]: failed_checks: List[Check] = [] diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 2e93539..8bf3f1b 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -11,8 +11,9 @@ from loguru import logger from pyth_observer.check import Check -from pyth_observer.check.publisher import PublisherCheck +from pyth_observer.check.publisher import PublisherCheck, PublisherState from pyth_observer.models import Publisher +from pyth_observer.zenduty import send_zenduty_alert load_dotenv() @@ -151,3 +152,27 @@ async def send(self): logger.error( f"Failed to send Telegram message: {response_text}" ) + + +class ZendutyEvent(Event): + def __init__(self, check: Check, context: Context): + self.check = check + self.context = context + + async def send(self): + event_details = self.check.error_message() + summary = "" + for key, value in event_details.items(): + summary += f"{key}: {value}\n" + + alert_identifier = ( + f"{self.check.__class__.__name__}-{self.check.state().symbol}" + ) + state = self.check.state() + if isinstance(state, PublisherState): + alert_identifier += f"-{state.publisher_name}" + + logger.debug(f"Sending Zenduty alert for {alert_identifier}") + await send_zenduty_alert( + alert_identifier=alert_identifier, message=alert_identifier, summary=summary + ) diff --git a/pyth_observer/zenduty.py b/pyth_observer/zenduty.py new file mode 100644 index 0000000..83e3154 --- /dev/null +++ b/pyth_observer/zenduty.py @@ -0,0 +1,52 @@ +import asyncio +import hashlib +import os + +import aiohttp +from loguru import logger + +headers = {"Content-Type": "application/json"} + + +async def send_zenduty_alert(alert_identifier, message, resolved=False, summary=""): + url = f"https://www.zenduty.com/api/events/{os.environ['ZENDUTY_INTEGRATION_KEY']}/" + # Use a hash of the alert_identifier as a unique id for the alert. + # Take the first 32 characters due to length limit of the api. + entity_id = hashlib.sha256(alert_identifier.encode("utf-8")).hexdigest()[:32] + + alert_type = "resolved" if resolved else "critical" + + data = { + "alert_type": alert_type, + "message": message, + "summary": summary, + "entity_id": entity_id, + } + + async with aiohttp.ClientSession() as session: + max_retries = 30 + retries = 0 + while retries < max_retries: + async with session.post(url, json=data, headers=headers) as response: + if 200 <= response.status < 300: + return response # Success case, return response + elif response.status == 429: + retries += 1 + if retries < max_retries: + logger.error( + f"Received 429 Too Many Requests for {alert_identifier}. Retrying in 1 second..." + ) + await asyncio.sleep( + min(30, 2**retries) + ) # Backoff before retrying, wait upto 30s + else: + logger.error( + f"Failed to send Zenduty event message for {alert_identifier} after {max_retries} retries." + ) + return response # Return response after max retries + else: + response_text = await response.text() + logger.error( + f"{response.status} Failed to send Zenduty event message for {alert_identifier}: {response_text}" + ) + return response # Non-retryable failure