Skip to content

Commit

Permalink
get ws working for token vault monitor python
Browse files Browse the repository at this point in the history
  • Loading branch information
Anirudh Suresh authored and --systemdf committed Feb 19, 2024
1 parent faee9b9 commit f88672b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 18 deletions.
64 changes: 51 additions & 13 deletions per_sdk/protocols/token_vault_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
chain_id: str,
include_price_updates: bool,
mock_pyth: bool,
feed_ids_ws: list[str] = [],
):
self.rpc_url = rpc_url
self.contract_address = contract_address
Expand All @@ -63,7 +64,10 @@ def __init__(
self.token_vault = self.w3.eth.contract(
address=contract_address, abi=get_vault_abi()
)
self.price_feed_client = PriceFeedClient([])

self.price_feed_client = PriceFeedClient(feed_ids_ws)
ws_call = self.price_feed_client.ws_pyth_prices()
asyncio.create_task(ws_call)

async def get_accounts(self) -> list[ProtocolAccount]:
"""
Expand Down Expand Up @@ -210,16 +214,42 @@ async def get_liquidation_opportunities(self) -> list[LiquidationOpportunity]:
liquidatable = []
accounts = await self.get_accounts()
for account in accounts:
# vault is already liquidated
# vault is already liquidated--can skip
if account["amount_collateral"] == 0 and account["amount_debt"] == 0:
continue
# TODO: optimize this to only query for the price feeds that are needed and only query once
(
price_collateral,
price_debt,
) = await self.price_feed_client.get_pyth_prices_latest(
[account["token_id_collateral"], account["token_id_debt"]]

token_id_collateral = account["token_id_collateral"]
token_id_debt = account["token_id_debt"]

if token_id_collateral not in (
self.price_feed_client.feed_ids
+ self.price_feed_client.pending_feed_ids
):
self.price_feed_client.add_feed_ids([token_id_collateral])
if token_id_debt not in (
self.price_feed_client.feed_ids
+ self.price_feed_client.pending_feed_ids
):
self.price_feed_client.add_feed_ids([token_id_debt])

price_collateral_feed = self.price_feed_client.prices_dict.get(
token_id_collateral
)
# get price of collateral asset from http request if doesn't return from ws
if price_collateral_feed is None:
(price_collateral_feed,) = (
await self.price_feed_client.get_pyth_prices_latest([token_id_debt])
)
price_collateral = price_collateral_feed.get("price")

price_debt_feed = self.price_feed_client.prices_dict.get(token_id_debt)
# get price of debt asset from http request if doesn't return from ws
if price_debt_feed is None:
(price_debt_feed,) = (
await self.price_feed_client.get_pyth_prices_latest([token_id_debt])
)
price_debt = price_debt_feed.get("price")

if price_collateral is None:
raise Exception(
f"Price for collateral token {account['token_id_collateral']} not found"
Expand All @@ -231,16 +261,21 @@ async def get_liquidation_opportunities(self) -> list[LiquidationOpportunity]:
)

value_collateral = (
int(price_collateral["price"]["price"]) * account["amount_collateral"]
int(price_collateral["price"]) * account["amount_collateral"]
)
value_debt = int(price_debt["price"]) * account["amount_debt"]
logger.debug(
f"Account {account['account_number']} health: {value_collateral / value_debt}"
)
value_debt = int(price_debt["price"]["price"]) * account["amount_debt"]
health = value_collateral / value_debt
logger.debug(f"Account {account['account_number']} health: {health}")
if (
value_debt * int(account["min_health_ratio"])
> value_collateral * 10**18
):
price_updates = [price_collateral, price_debt]
price_updates = [
self.price_feed_client.extract_price_feed(price_collateral_feed),
self.price_feed_client.extract_price_feed(price_debt_feed),
]
self.price_feed_client.extract_price_feed(price_collateral_feed)
liquidatable.append(self.create_liquidation_opp(account, price_updates))

return liquidatable
Expand Down Expand Up @@ -312,13 +347,16 @@ async def main():
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)

feed_ids_ws = []

monitor = VaultMonitor(
args.rpc_url,
args.vault_contract,
args.weth_contract,
args.chain_id,
args.include_price_updates,
args.mock_pyth,
feed_ids_ws,
)

while True:
Expand Down
9 changes: 4 additions & 5 deletions per_sdk/utils/pyth_prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async def ws_pyth_prices(self):

async with websockets.connect(HERMES_ENDPOINT_WSS) as ws:
while True:
# add new price feed ids to the ws subscription
if len(self.pending_feed_ids) > 0:
json_subscribe = {
"ids": self.pending_feed_ids,
Expand All @@ -137,8 +138,8 @@ async def ws_pyth_prices(self):
self.pending_feed_ids = []

msg = json.loads(await ws.recv())
if msg["type"] == "response":
if msg["status"] != "success":
if msg.get("type") == "response":
if msg.get("status") != "success":
raise Exception("Error in subscribing to websocket")
try:
if msg["type"] != "price_update":
Expand All @@ -155,9 +156,7 @@ async def ws_pyth_prices(self):

async def main():
feed_ids = await get_price_feed_ids()
# TODO: remove this line, once rate limits are figured out
feed_ids = feed_ids[:1]
price_feed_client = PriceFeedClient(feed_ids)
price_feed_client = PriceFeedClient(feed_ids[:50])

print("Starting web socket...")
ws_call = price_feed_client.ws_pyth_prices()
Expand Down

0 comments on commit f88672b

Please sign in to comment.