Skip to content

Commit

Permalink
format log events in json (#63)
Browse files Browse the repository at this point in the history
* format log events in json

* ensure the fields are included in the log record as json

* remove unused import, bump poetry

* linting

* fixes
  • Loading branch information
ayazabbas authored May 9, 2024
1 parent 0186c64 commit 74b4a9f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.2.0"
version = "0.2.1"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand Down
85 changes: 39 additions & 46 deletions pyth_observer/check/price_feed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
from dataclasses import dataclass
from datetime import datetime
from textwrap import dedent
from typing import Dict, Optional, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

Expand Down Expand Up @@ -42,7 +41,7 @@ def state(self) -> PriceFeedState:
def run(self) -> bool:
...

def error_message(self) -> str:
def error_message(self) -> dict:
...


Expand Down Expand Up @@ -80,17 +79,15 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
distance = self.__state.latest_block_slot - self.__state.latest_trading_slot
return dedent(
f"""
{self.__state.symbol} is offline (either non-trading/stale).
It is not updated for {distance} slots.
Latest trading slot: {self.__state.latest_trading_slot}
Block slot: {self.__state.latest_block_slot}
"""
).strip()
return {
"msg": f"{self.__state.symbol} is offline (either non-trading/stale). Last update {distance} slots ago.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"latest_trading_slot": self.__state.latest_trading_slot,
"block_slot": self.__state.latest_block_slot,
}


class PriceFeedCoinGeckoCheck(PriceFeedCheck):
Expand Down Expand Up @@ -127,15 +124,14 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.symbol} is too far from Coingecko's price.
Pyth price: {self.__state.price_aggregate}
Coingecko price: {self.__state.coingecko_price}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.symbol} is too far from Coingecko's price.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"pyth_price": self.__state.price_aggregate,
"coingecko_price": self.__state.coingecko_price,
}


class PriceFeedConfidenceIntervalCheck(PriceFeedCheck):
Expand All @@ -158,14 +154,13 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.symbol} confidence interval is too low.
Confidence interval: {self.__state.confidence_interval_aggregate}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.symbol} confidence interval is too low.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"confidence_interval": self.__state.confidence_interval_aggregate,
}


class PriceFeedCrossChainOnlineCheck(PriceFeedCheck):
Expand Down Expand Up @@ -210,19 +205,18 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
if self.__state.crosschain_price:
publish_time = arrow.get(self.__state.crosschain_price["publish_time"])
else:
publish_time = arrow.get(0)

return dedent(
f"""
{self.__state.symbol} isn't online at the price service.
Last publish time: {publish_time.format('YYYY-MM-DD HH:mm:ss ZZ')}
"""
).strip()
return {
"msg": f"{self.__state.symbol} isn't online at the price service.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"last_publish_time": publish_time.format("YYYY-MM-DD HH:mm:ss ZZ"),
}


class PriceFeedCrossChainDeviationCheck(PriceFeedCheck):
Expand Down Expand Up @@ -270,21 +264,20 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
# It can never happen because of the check logic but linter could not understand it.
price = (
self.__state.crosschain_price["price"]
if self.__state.crosschain_price
else None
)
return dedent(
f"""
{self.__state.symbol} is too far at the price service.
Price: {self.__state.price_aggregate}
Price at price service: {price}
"""
).strip()
return {
"msg": f"{self.__state.symbol} is too far at the price service.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"price": self.__state.price_aggregate,
"price_at_price_service": price,
}


PRICE_FEED_CHECKS = [
Expand Down
84 changes: 38 additions & 46 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
from textwrap import dedent
from typing import Dict, Protocol, runtime_checkable

from pythclient.pythaccounts import PythPriceStatus
Expand Down Expand Up @@ -38,7 +37,7 @@ def state(self) -> PublisherState:
def run(self) -> bool:
...

def error_message(self) -> str:
def error_message(self) -> dict:
...


Expand Down Expand Up @@ -78,20 +77,17 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
diff = self.__state.price - self.__state.price_aggregate
intervals_away = abs(diff / self.__state.confidence_interval_aggregate)

return dedent(
f"""
{self.__state.publisher_name} price not within aggregate confidence.
It is {intervals_away} times away from confidence.
Symbol: {self.__state.symbol}
Publisher price: {self.__state.price} ± {self.__state.confidence_interval}
Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} price is {intervals_away} times away from confidence.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}",
"aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}",
}


class PublisherConfidenceIntervalCheck(PublisherCheck):
Expand Down Expand Up @@ -119,16 +115,15 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.publisher_name} confidence interval is too tight.
Symbol: {self.__state.symbol}
Price: {self.__state.price}
Confidence interval: {self.__state.confidence_interval}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.publisher_name} confidence interval is too tight.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"confidence_interval": self.__state.confidence_interval,
}


class PublisherOfflineCheck(PublisherCheck):
Expand All @@ -154,17 +149,16 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
distance = self.__state.latest_block_slot - self.__state.slot
return dedent(
f"""
{self.__state.publisher_name} hasn't published recently for {distance} slots.
Symbol: {self.__state.symbol}
Publisher slot: {self.__state.slot}
Aggregate slot: {self.__state.aggregate_slot}
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} hasn't published recently for {distance} slots.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_slot": self.__state.slot,
"aggregate_slot": self.__state.aggregate_slot,
}


class PublisherPriceCheck(PublisherCheck):
Expand Down Expand Up @@ -203,19 +197,17 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
deviation = (self.ci_adjusted_price_diff() / self.__state.price_aggregate) * 100

return dedent(
f"""
{self.__state.publisher_name} price is too far from aggregate price.
Symbol: {self.__state.symbol}
Publisher price: {self.__state.price} ± {self.__state.confidence_interval}
Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}
Deviation: {deviation}%
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} price is too far from aggregate price.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}",
"aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}",
"deviation": deviation,
}

# Returns the distance between the aggregate price and the closest side of the publisher's confidence interval
# Returns 0 if the aggregate price is within the publisher's confidence interval.
Expand Down
19 changes: 8 additions & 11 deletions pyth_observer/event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from typing import Dict, Literal, Protocol, TypedDict, cast
from typing import Dict, Protocol, TypedDict, cast

import aiohttp
from datadog_api_client.api_client import AsyncApiClient as DatadogAPI
Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, check: Check, context: Context):
async def send(self):
# Publisher checks expect the key -> name mapping of publishers when
# generating the error title/message.
text = self.check.error_message()
event = self.check.error_message()

# An example is: PriceFeedOfflineCheck-Crypto.AAVE/USD
aggregation_key = f"{self.check.__class__.__name__}-{self.check.state().symbol}"
Expand All @@ -50,8 +51,8 @@ async def send(self):

event = EventCreateRequest(
aggregation_key=aggregation_key,
title=text.split("\n")[0],
text=text,
title=event["msg"],
text=json.dumps(event),
tags=[
"service:observer",
f"network:{self.context['network']}",
Expand Down Expand Up @@ -84,9 +85,6 @@ async def send(self):
)


LogEventLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]


class LogEvent(Event):
def __init__(self, check: Check, context: Context):
self.check = check
Expand All @@ -95,10 +93,9 @@ def __init__(self, check: Check, context: Context):
async def send(self):
# Publisher checks expect the key -> name mapping of publishers when
# generating the error title/message.
text = self.check.error_message()

level = cast(LogEventLevel, os.environ.get("LOG_EVENT_LEVEL", "INFO"))
logger.log(level, text.replace("\n", ". "))
event = self.check.error_message()
with logger.contextualize(**event):
logger.info(event["msg"])


class TelegramEvent(Event):
Expand Down

0 comments on commit 74b4a9f

Please sign in to comment.