diff --git a/pyproject.toml b/pyproject.toml index 429ec64..ec2fe86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ignore_missing_imports = true [tool.poetry] name = "pyth-observer" -version = "0.2.0" +version = "0.2.1" description = "Alerts and stuff" authors = [] readme = "README.md" diff --git a/pyth_observer/check/price_feed.py b/pyth_observer/check/price_feed.py index 9a8eccb..9a421f1 100644 --- a/pyth_observer/check/price_feed.py +++ b/pyth_observer/check/price_feed.py @@ -1,7 +1,6 @@ import time from dataclasses import dataclass from datetime import datetime -from textwrap import dedent from typing import Dict, Optional, Protocol, runtime_checkable from zoneinfo import ZoneInfo @@ -42,7 +41,7 @@ def state(self) -> PriceFeedState: def run(self) -> bool: ... - def error_message(self) -> str: + def error_message(self) -> dict: ... @@ -80,17 +79,15 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: distance = self.__state.latest_block_slot - self.__state.latest_trading_slot - return dedent( - f""" - {self.__state.symbol} is offline (either non-trading/stale). - It is not updated for {distance} slots. - - Latest trading slot: {self.__state.latest_trading_slot} - Block slot: {self.__state.latest_block_slot} - """ - ).strip() + return { + "msg": f"{self.__state.symbol} is offline (either non-trading/stale). Last update {distance} slots ago.", + "type": "PriceFeedCheck", + "symbol": self.__state.symbol, + "latest_trading_slot": self.__state.latest_trading_slot, + "block_slot": self.__state.latest_block_slot, + } class PriceFeedCoinGeckoCheck(PriceFeedCheck): @@ -127,15 +124,14 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: - return dedent( - f""" - {self.__state.symbol} is too far from Coingecko's price. - - Pyth price: {self.__state.price_aggregate} - Coingecko price: {self.__state.coingecko_price} - """ - ).strip() + def error_message(self) -> dict: + return { + "msg": f"{self.__state.symbol} is too far from Coingecko's price.", + "type": "PriceFeedCheck", + "symbol": self.__state.symbol, + "pyth_price": self.__state.price_aggregate, + "coingecko_price": self.__state.coingecko_price, + } class PriceFeedConfidenceIntervalCheck(PriceFeedCheck): @@ -158,14 +154,13 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: - return dedent( - f""" - {self.__state.symbol} confidence interval is too low. - - Confidence interval: {self.__state.confidence_interval_aggregate} - """ - ).strip() + def error_message(self) -> dict: + return { + "msg": f"{self.__state.symbol} confidence interval is too low.", + "type": "PriceFeedCheck", + "symbol": self.__state.symbol, + "confidence_interval": self.__state.confidence_interval_aggregate, + } class PriceFeedCrossChainOnlineCheck(PriceFeedCheck): @@ -210,19 +205,18 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: if self.__state.crosschain_price: publish_time = arrow.get(self.__state.crosschain_price["publish_time"]) else: publish_time = arrow.get(0) - return dedent( - f""" - {self.__state.symbol} isn't online at the price service. - - Last publish time: {publish_time.format('YYYY-MM-DD HH:mm:ss ZZ')} - """ - ).strip() + return { + "msg": f"{self.__state.symbol} isn't online at the price service.", + "type": "PriceFeedCheck", + "symbol": self.__state.symbol, + "last_publish_time": publish_time.format("YYYY-MM-DD HH:mm:ss ZZ"), + } class PriceFeedCrossChainDeviationCheck(PriceFeedCheck): @@ -270,21 +264,20 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: # It can never happen because of the check logic but linter could not understand it. price = ( self.__state.crosschain_price["price"] if self.__state.crosschain_price else None ) - return dedent( - f""" - {self.__state.symbol} is too far at the price service. - - Price: {self.__state.price_aggregate} - Price at price service: {price} - """ - ).strip() + return { + "msg": f"{self.__state.symbol} is too far at the price service.", + "type": "PriceFeedCheck", + "symbol": self.__state.symbol, + "price": self.__state.price_aggregate, + "price_at_price_service": price, + } PRICE_FEED_CHECKS = [ diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index 415676d..c85b1c6 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from textwrap import dedent from typing import Dict, Protocol, runtime_checkable from pythclient.pythaccounts import PythPriceStatus @@ -38,7 +37,7 @@ def state(self) -> PublisherState: def run(self) -> bool: ... - def error_message(self) -> str: + def error_message(self) -> dict: ... @@ -78,20 +77,17 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: diff = self.__state.price - self.__state.price_aggregate intervals_away = abs(diff / self.__state.confidence_interval_aggregate) - - return dedent( - f""" - {self.__state.publisher_name} price not within aggregate confidence. - It is {intervals_away} times away from confidence. - - Symbol: {self.__state.symbol} - Publisher price: {self.__state.price} ± {self.__state.confidence_interval} - Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate} - """ - ).strip() + return { + "msg": f"{self.__state.publisher_name} price is {intervals_away} times away from confidence.", + "type": "PublisherCheck", + "publisher": self.__state.publisher_name, + "symbol": self.__state.symbol, + "publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}", + "aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}", + } class PublisherConfidenceIntervalCheck(PublisherCheck): @@ -119,16 +115,15 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: - return dedent( - f""" - {self.__state.publisher_name} confidence interval is too tight. - - Symbol: {self.__state.symbol} - Price: {self.__state.price} - Confidence interval: {self.__state.confidence_interval} - """ - ).strip() + def error_message(self) -> dict: + return { + "msg": f"{self.__state.publisher_name} confidence interval is too tight.", + "type": "PublisherCheck", + "publisher": self.__state.publisher_name, + "symbol": self.__state.symbol, + "price": self.__state.price, + "confidence_interval": self.__state.confidence_interval, + } class PublisherOfflineCheck(PublisherCheck): @@ -154,17 +149,16 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: distance = self.__state.latest_block_slot - self.__state.slot - return dedent( - f""" - {self.__state.publisher_name} hasn't published recently for {distance} slots. - - Symbol: {self.__state.symbol} - Publisher slot: {self.__state.slot} - Aggregate slot: {self.__state.aggregate_slot} - """ - ).strip() + return { + "msg": f"{self.__state.publisher_name} hasn't published recently for {distance} slots.", + "type": "PublisherCheck", + "publisher": self.__state.publisher_name, + "symbol": self.__state.symbol, + "publisher_slot": self.__state.slot, + "aggregate_slot": self.__state.aggregate_slot, + } class PublisherPriceCheck(PublisherCheck): @@ -203,19 +197,17 @@ def run(self) -> bool: # Fail return False - def error_message(self) -> str: + def error_message(self) -> dict: deviation = (self.ci_adjusted_price_diff() / self.__state.price_aggregate) * 100 - - return dedent( - f""" - {self.__state.publisher_name} price is too far from aggregate price. - - Symbol: {self.__state.symbol} - Publisher price: {self.__state.price} ± {self.__state.confidence_interval} - Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate} - Deviation: {deviation}% - """ - ).strip() + return { + "msg": f"{self.__state.publisher_name} price is too far from aggregate price.", + "type": "PublisherCheck", + "publisher": self.__state.publisher_name, + "symbol": self.__state.symbol, + "publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}", + "aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}", + "deviation": deviation, + } # Returns the distance between the aggregate price and the closest side of the publisher's confidence interval # Returns 0 if the aggregate price is within the publisher's confidence interval. diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 0cb939a..beeabee 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -1,5 +1,6 @@ +import json import os -from typing import Dict, Literal, Protocol, TypedDict, cast +from typing import Dict, Protocol, TypedDict, cast import aiohttp from datadog_api_client.api_client import AsyncApiClient as DatadogAPI @@ -38,7 +39,7 @@ def __init__(self, check: Check, context: Context): async def send(self): # Publisher checks expect the key -> name mapping of publishers when # generating the error title/message. - text = self.check.error_message() + event = self.check.error_message() # An example is: PriceFeedOfflineCheck-Crypto.AAVE/USD aggregation_key = f"{self.check.__class__.__name__}-{self.check.state().symbol}" @@ -50,8 +51,8 @@ async def send(self): event = EventCreateRequest( aggregation_key=aggregation_key, - title=text.split("\n")[0], - text=text, + title=event["msg"], + text=json.dumps(event), tags=[ "service:observer", f"network:{self.context['network']}", @@ -84,9 +85,6 @@ async def send(self): ) -LogEventLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"] - - class LogEvent(Event): def __init__(self, check: Check, context: Context): self.check = check @@ -95,10 +93,9 @@ def __init__(self, check: Check, context: Context): async def send(self): # Publisher checks expect the key -> name mapping of publishers when # generating the error title/message. - text = self.check.error_message() - - level = cast(LogEventLevel, os.environ.get("LOG_EVENT_LEVEL", "INFO")) - logger.log(level, text.replace("\n", ". ")) + event = self.check.error_message() + with logger.contextualize(**event): + logger.info(event["msg"]) class TelegramEvent(Event):