Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add telegram integration #59

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ __pycache__/

.DS_Store
.envrc
.coverage
.coverage

.env
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pyyaml = "^6.0"
throttler = "1.2.1"
types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"


[tool.poetry.group.dev.dependencies]
Expand Down
5 changes: 3 additions & 2 deletions pyth_observer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import os
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple

from base58 import b58decode
from loguru import logger
Expand Down Expand Up @@ -51,9 +51,10 @@ def __init__(
config: Dict[str, Any],
publishers: Dict[str, str],
coingecko_mapping: Dict[str, Symbol],
telegram_mapping: Optional[Dict[str, str]] = None,
):
self.config = config
self.dispatch = Dispatch(config, publishers)
self.dispatch = Dispatch(config, publishers, telegram_mapping=telegram_mapping)
self.publishers = publishers
self.pyth_client = PythClient(
solana_endpoint=config["network"]["http_endpoint"],
Expand Down
15 changes: 13 additions & 2 deletions pyth_observer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,29 @@
envvar="COINGECKO_MAPPING",
required=True,
)
@click.option(
"--telegram-mapping",
help="Path to YAML/JSON file with publisher key-Telegram chat ID mappings",
envvar="TELEGRAM_MAPPING",
required=False,
)
@click.option(
"--prometheus-port",
help="Port number for Prometheus metrics endpoint",
envvar="PROMETHEUS_PORT",
default="9001",
)
def run(config, publishers, coingecko_mapping, prometheus_port):
def run(config, publishers, coingecko_mapping, telegram_mapping, prometheus_port):
config_ = yaml.safe_load(open(config, "r"))
publishers_ = yaml.safe_load(open(publishers, "r"))
publishers_inverted = {v: k for k, v in publishers_.items()}
coingecko_mapping_ = yaml.safe_load(open(coingecko_mapping, "r"))
observer = Observer(config_, publishers_inverted, coingecko_mapping_)
telegram_mapping_ = (
yaml.safe_load(open(telegram_mapping, "r")) if telegram_mapping else {}
)
observer = Observer(
config_, publishers_inverted, coingecko_mapping_, telegram_mapping_
)

start_http_server(int(prometheus_port))

Expand Down
6 changes: 5 additions & 1 deletion pyth_observer/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from pyth_observer.check.publisher import PUBLISHER_CHECKS, PublisherState
from pyth_observer.event import DatadogEvent # Used dynamically
from pyth_observer.event import LogEvent # Used dynamically
from pyth_observer.event import TelegramEvent # Used dynamically
from pyth_observer.event import Event

assert DatadogEvent
assert LogEvent
assert TelegramEvent


class Dispatch:
Expand All @@ -21,9 +23,10 @@ class Dispatch:
notifiers for the checks that failed.
"""

def __init__(self, config, publishers):
def __init__(self, config, publishers, telegram_mapping=None):
self.config = config
self.publishers = publishers
self.telegram_mapping = telegram_mapping
self.price_feed_check_gauge = Gauge(
"price_feed_check_failed",
"Price feed check failure status",
Expand Down Expand Up @@ -52,6 +55,7 @@ async def run(self, states: List[State]):
context = {
"network": self.config["network"]["name"],
"publishers": self.publishers,
"telegram_mapping": self.telegram_mapping or {},
}

for check in failed_checks:
Expand Down
39 changes: 39 additions & 0 deletions pyth_observer/event.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import os
from typing import Dict, Literal, Protocol, TypedDict, cast

import aiohttp
from datadog_api_client.api_client import AsyncApiClient as DatadogAPI
from datadog_api_client.configuration import Configuration as DatadogConfig
from datadog_api_client.v1.api.events_api import EventsApi as DatadogEventAPI
from datadog_api_client.v1.model.event_alert_type import EventAlertType
from datadog_api_client.v1.model.event_create_request import EventCreateRequest
from dotenv import load_dotenv
from loguru import logger

from pyth_observer.check import Check
from pyth_observer.check.publisher import PublisherCheck

load_dotenv()


class Context(TypedDict):
network: str
publishers: Dict[str, str]
telegram_mapping: Dict[str, str]


class Event(Protocol):
Expand Down Expand Up @@ -94,3 +99,37 @@ async def send(self):

level = cast(LogEventLevel, os.environ.get("LOG_EVENT_LEVEL", "INFO"))
logger.log(level, text.replace("\n", ". "))


class TelegramEvent(Event):
def __init__(self, check: Check, context: Context):
self.check = check
self.context = context

async def send(self):
text = self.check.error_message()
# Extract the publisher key from the message text
publisher_key = text[text.find("(") + 1 : text.find(")")]
# Retrieve the chat ID from the telegram_mapping using the publisher key
chat_id = self.context["telegram_mapping"].get(publisher_key, None)

if chat_id is None:
logger.warning(
f"Telegram chat ID not found for publisher key {publisher_key}"
)
return

telegram_api_url = f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage"
message_data = {
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
}

async with aiohttp.ClientSession() as session:
async with session.post(telegram_api_url, json=message_data) as response:
if response.status != 200:
response_text = await response.text()
raise RuntimeError(
f"Failed to send Telegram message: {response_text}"
)
1 change: 1 addition & 0 deletions sample.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ events:
# NOTE: Uncomment to enable Datadog metrics, see README.md for datadog credential docs.
# - DatadogEvent
- LogEvent
- TelegramEvent
checks:
global:
# Price feed checks
Expand Down
4 changes: 4 additions & 0 deletions sample.telegram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think people might make mistake here. How do you feel about merging this with publishers.yaml and have something like this:

- name: XYZ
   key: dklaajfaldj
   contract:
     telegram_id:  (optional)
     email: (optional for future)
     slack_channel_id: (optional)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats a good idea

"pyq7ySiH5RvKteu2vdXKC7SNyNDp9vNDkGXdHxSpPtu": "-1111111111",
"G41cdkE63eCTE2KPtFLN9AVNGkVEdnhvSDtS9Ab5XK6G": "-2222222222",
}
Loading