Skip to content

Commit

Permalink
feat: add PublisherStalledCheck (#68)
Browse files Browse the repository at this point in the history
* add PublisherStalledCheck

* update sample.config.yaml with per symbol config sample

* update comment
  • Loading branch information
cctdaniel authored May 17, 2024
1 parent c125d78 commit 19c5063
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.2.4"
version = "0.2.5"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand Down
42 changes: 42 additions & 0 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass
from typing import Dict, Protocol, runtime_checkable

Expand All @@ -6,6 +7,8 @@

PUBLISHER_EXCLUSION_DISTANCE = 25

PUBLISHER_CACHE = {}


@dataclass
class PublisherState:
Expand Down Expand Up @@ -216,9 +219,48 @@ def ci_adjusted_price_diff(self) -> float:
return max(price_only_diff - self.__state.confidence_interval, 0)


class PublisherStalledCheck(PublisherCheck):
def __init__(self, state: PublisherState, config: PublisherCheckConfig):
self.__state = state
self.__stall_time_limit: int = int(
config["stall_time_limit"]
) # Time in seconds

def state(self) -> PublisherState:
return self.__state

def run(self) -> bool:
publisher_key = (self.__state.publisher_name, self.__state.symbol)
current_time = time.time()
previous_price, last_change_time = PUBLISHER_CACHE.get(
publisher_key, (None, None)
)

if previous_price is None or self.__state.price != previous_price:
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
return True

if (current_time - last_change_time) > self.__stall_time_limit:
return False

return True

def error_message(self) -> dict:
return {
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
"type": "PublisherStalledCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"stall_duration": time.time()
- PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1],
}


PUBLISHER_CHECKS = [
PublisherWithinAggregateConfidenceCheck,
PublisherConfidenceIntervalCheck,
PublisherOfflineCheck,
PublisherPriceCheck,
PublisherStalledCheck,
]
6 changes: 6 additions & 0 deletions sample.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ checks:
enable: true
max_slot_distance: 25
max_aggregate_distance: 6
PublisherStalledCheck:
enable: true
stall_time_limit: 60
# Per-symbol config
Crypto.MNGO/USD:
PriceFeedOfflineCheck:
max_slot_distance: 10000
FX.USD/HKD:
PriceFeedOfflineCheck:
max_slot_distance: 10000
Crypto.BTC/USD:
PublisherStalledCheck:
stall_time_limit: 10 # This will override the global stall_time_limit for Crypto.BTC/USD
53 changes: 52 additions & 1 deletion tests/test_checks_publisher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import time
from unittest.mock import patch

from pythclient.pythaccounts import PythPriceStatus
from pythclient.solana import SolanaPublicKey

from pyth_observer.check.publisher import PublisherPriceCheck, PublisherState
from pyth_observer.check.publisher import (
PUBLISHER_CACHE,
PublisherPriceCheck,
PublisherStalledCheck,
PublisherState,
)


def make_state(
Expand Down Expand Up @@ -44,3 +52,46 @@ def check_is_ok(
state1 = make_state(1, 100.0, 2.0, 1, 110.0, 1.0)
assert check_is_ok(state1, 10, 25)
assert not check_is_ok(state1, 6, 25)


def test_publisher_stalled_check():
current_time = time.time()

def simulate_time_pass(seconds):
nonlocal current_time
current_time += seconds
return current_time

def setup_check(state, stall_time_limit):
check = PublisherStalledCheck(state, {"stall_time_limit": stall_time_limit})
PUBLISHER_CACHE[(state.publisher_name, state.symbol)] = (
state.price,
current_time,
)
return check

def run_check(check, seconds, expected):
with patch("time.time", new=lambda: simulate_time_pass(seconds)):
assert check.run() == expected

PUBLISHER_CACHE.clear()
state_a = make_state(1, 100.0, 2.0, 1, 100.0, 1.0)
check_a = setup_check(state_a, 5)
run_check(check_a, 5, True) # Should pass as it hits the limit exactly

PUBLISHER_CACHE.clear()
state_b = make_state(1, 100.0, 2.0, 1, 100.0, 1.0)
check_b = setup_check(state_b, 5)
run_check(check_b, 6, False) # Should fail as it exceeds the limit

PUBLISHER_CACHE.clear()
state_c = make_state(1, 100.0, 2.0, 1, 100.0, 1.0)
check_c = setup_check(state_c, 5)
run_check(check_c, 2, True) # Initial check should pass
state_c.price = 105.0 # Change the price
run_check(check_c, 3, True) # Should pass as price changes
state_c.price = 100.0 # Change back to original price
run_check(check_c, 4, True) # Should pass as price changes
run_check(
check_c, 8, False
) # Should fail as price stalls for too long after last change

0 comments on commit 19c5063

Please sign in to comment.