Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(observer): Detect artificially noisy stalled publisher prices #81

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ __pycache__/
.envrc
.coverage

.env
.env
.vscode/
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.10
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data.

Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer

To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run:
To run Observer locally, you will need:
- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file).
- [Poetry](https://python-poetry.org), which handles package and virtualenv management.

Install dependencies and run the service:
```sh
$ poetry env use $(which python) # point Poetry to the pyenv python shim
$ poetry install
$ poetry run pyth-observer
```
Expand Down
69 changes: 67 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ throttler = "1.2.1"
types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"
numpy = "^2.1.3"


[tool.poetry.group.dev.dependencies]
Expand Down
69 changes: 53 additions & 16 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
import time
from dataclasses import dataclass
from collections import defaultdict, deque
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Dict, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

from loguru import logger
from pythclient.calendar import is_market_open
from pythclient.pythaccounts import PythPriceStatus
from pythclient.solana import SolanaPublicKey


@dataclass
class PriceUpdate:
"""Represents a single price with its timestamp (epoch seconds)."""

timestamp: int
price: float


PUBLISHER_EXCLUSION_DISTANCE = 25
PUBLISHER_CACHE_MAX_LEN = 30
"""Roughly 30 mins of updates, since the check runs about once a minute"""

PUBLISHER_CACHE = {}
PUBLISHER_CACHE = defaultdict(lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN))
"""
Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in.
Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned.
Used by the PublisherStalledCheck to detect stalls in prices.
"""


@dataclass
Expand Down Expand Up @@ -240,6 +258,16 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig):
self.__abandoned_time_limit: int = int(config["abandoned_time_limit"])
self.__max_slot_distance: int = int(config["max_slot_distance"])

from pyth_observer.check.stall_detection import ( # noqa: deferred import to avoid circular import
StallDetector,
)

self.__detector = StallDetector(
stall_time_limit=self.__stall_time_limit,
noise_threshold=float(config["noise_threshold"]),
min_noise_samples=int(config["min_noise_samples"]),
)

def state(self) -> PublisherState:
return self.__state

Expand All @@ -254,36 +282,45 @@ def run(self) -> bool:

distance = self.__state.latest_block_slot - self.__state.slot

# Pass for redemption rates because they are expected to be static for long periods
if self.__state.asset_type == "Crypto Redemption Rate":
return True

# Pass when publisher is offline because PublisherOfflineCheck will be triggered
if distance >= self.__max_slot_distance:
return True

publisher_key = (self.__state.publisher_name, self.__state.symbol)
current_time = int(time.time())
previous_price, last_change_time = PUBLISHER_CACHE.get(
publisher_key, (None, None)

publisher_key = (self.__state.publisher_name, self.__state.symbol)
PUBLISHER_CACHE[publisher_key].append(
PriceUpdate(current_time, self.__state.price)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the logic in the StalledDetector that you have I think you need to append a price only it's different that the latest stored price, otherwise the exact stalled might never fire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah you're right, i was considering a few different ways to do this and looks like this slipped between the cracks during refactoring. test suite should have caught this -- let me fix and cover it with a test case

)
updates = PUBLISHER_CACHE[publisher_key]

if previous_price is None or self.__state.price != previous_price:
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
return True
# Analyze for stalls
result = self.__detector.analyze_updates(list(updates))
logger.debug(f"Stall detection result: {result}")

time_since_last_change = current_time - last_change_time
if time_since_last_change > self.__stall_time_limit:
if time_since_last_change > self.__abandoned_time_limit:
return True # Abandon this check after the abandoned time limit
return False
self.__last_analysis = result # For error logging

# If we've been stalled for too long, abandon this check
if result.is_stalled and result.duration > self.__abandoned_time_limit:
return True

return True
return not result.is_stalled

def error_message(self) -> dict:
stall_duration = f"{self.__last_analysis.duration:.1f} seconds"
return {
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
"msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}",
"type": "PublisherStalledCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds",
"stall_type": self.__last_analysis.stall_type,
"stall_duration": stall_duration,
"analysis": asdict(self.__last_analysis),
}


Expand Down
Loading
Loading