Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(observer): Detect artificially noisy stalled publisher prices #81

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ __pycache__/
.envrc
.coverage

.env
.env
.vscode/
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.10
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data.

Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer

To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run:
To run Observer locally, you will need:
- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file).
- [Poetry](https://python-poetry.org), which handles package and virtualenv management.

Install dependencies and run the service:
```sh
$ poetry env use $(which python) # point Poetry to the pyenv python shim
$ poetry install
$ poetry run pyth-observer
```
Expand Down
69 changes: 67 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.2.15"
version = "0.3.0"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand All @@ -27,6 +27,7 @@ throttler = "1.2.1"
types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"
numpy = "^2.1.3"


[tool.poetry.group.dev.dependencies]
Expand Down
75 changes: 58 additions & 17 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
import time
from dataclasses import dataclass
from collections import defaultdict, deque
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Dict, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

from loguru import logger
from pythclient.calendar import is_market_open
from pythclient.pythaccounts import PythPriceStatus
from pythclient.solana import SolanaPublicKey


@dataclass
class PriceUpdate:
"""Represents a single price with its timestamp (epoch seconds)."""

timestamp: int
price: float


PUBLISHER_EXCLUSION_DISTANCE = 25
PUBLISHER_CACHE_MAX_LEN = 30
"""Roughly 30 mins of updates, since the check runs about once a minute"""

PUBLISHER_CACHE = {}
PUBLISHER_CACHE = defaultdict(lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN))
"""
Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in.
Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned.
Used by the PublisherStalledCheck to detect stalls in prices.
"""


@dataclass
Expand Down Expand Up @@ -240,6 +258,16 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig):
self.__abandoned_time_limit: int = int(config["abandoned_time_limit"])
self.__max_slot_distance: int = int(config["max_slot_distance"])

from pyth_observer.check.stall_detection import ( # noqa: deferred import to avoid circular import
StallDetector,
)

self.__detector = StallDetector(
stall_time_limit=self.__stall_time_limit,
noise_threshold=float(config["noise_threshold"]),
min_noise_samples=int(config["min_noise_samples"]),
)

def state(self) -> PublisherState:
return self.__state

Expand All @@ -254,36 +282,49 @@ def run(self) -> bool:

distance = self.__state.latest_block_slot - self.__state.slot

# Pass for redemption rates because they are expected to be static for long periods
if self.__state.asset_type == "Crypto Redemption Rate":
return True

# Pass when publisher is offline because PublisherOfflineCheck will be triggered
if distance >= self.__max_slot_distance:
return True

publisher_key = (self.__state.publisher_name, self.__state.symbol)
current_time = int(time.time())
previous_price, last_change_time = PUBLISHER_CACHE.get(
publisher_key, (None, None)
)

if previous_price is None or self.__state.price != previous_price:
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
return True
publisher_key = (self.__state.publisher_name, self.__state.symbol)
updates = PUBLISHER_CACHE[publisher_key]

# Only cache new prices, let repeated prices grow stale.
# These will be caught as an exact stall in the detector.
is_repeated_price = updates and updates[-1].price == self.__state.price
cur_update = PriceUpdate(current_time, self.__state.price)
if not is_repeated_price:
PUBLISHER_CACHE[publisher_key].append(cur_update)

# Analyze for stalls
result = self.__detector.analyze_updates(list(updates), cur_update)
logger.debug(f"Stall detection result: {result}")

time_since_last_change = current_time - last_change_time
if time_since_last_change > self.__stall_time_limit:
if time_since_last_change > self.__abandoned_time_limit:
return True # Abandon this check after the abandoned time limit
return False
self.__last_analysis = result # For error logging

# If we've been stalled for too long, abandon this check
if result.is_stalled and result.duration > self.__abandoned_time_limit:
return True

return True
return not result.is_stalled

def error_message(self) -> dict:
stall_duration = f"{self.__last_analysis.duration:.1f} seconds"
return {
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
"msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}",
"type": "PublisherStalledCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds",
"stall_type": self.__last_analysis.stall_type,
"stall_duration": stall_duration,
"analysis": asdict(self.__last_analysis),
}


Expand Down
Loading
Loading