diff --git a/per_sdk/protocols/README.md b/per_sdk/protocols/README.md index ced3b8e0..a7507f50 100644 --- a/per_sdk/protocols/README.md +++ b/per_sdk/protocols/README.md @@ -4,6 +4,6 @@ The monitor is the off-chain service that exposes liquidation opportunities on i The LiquidationAdapter contract that is part of the Express Relay on-chain stack allows searchers to perform liquidations across different protocols without needing to deploy their own contracts or perform bespoke integration work. The monitor service is important in enabling this, as it publishes the all the necessary information that searchers need for signing their intent on executing the liquidations. -Each protocol that integrates with Express Relay and the LiquidationAdapter workflow must provide code that publishes liquidation opportunities; the example file for the TokenVault dummy contract is found in `/protocols`. Some common types are defined in `utils/types_liquidation_adapter.py`, and standard functions for accessing Pyth prices can be found in `utils/pyth_prices.py`. The exact interface of the methods in the protocol's file is not important, but it should have a similar entrypoint with the same command line arguments and general behavior of sending liquidation opportunities to the liquidation server when specified. +Each protocol that integrates with Express Relay and the LiquidationAdapter workflow must provide code that publishes liquidation opportunities; the example file for the TokenVault dummy contract is found in `/protocols`. Some common types are defined in `utils/types_liquidation_adapter.py`, and standard functions for accessing Pyth prices can be found in the [Pyth python client](https://github.com/pyth-network/pyth-client-py/tree/main). The exact interface of the methods in the protocol's file is not important, but it should have a similar entrypoint with the same command line arguments and general behavior of sending liquidation opportunities to the liquidation server when specified. The party that runs the monitor can run the protocol-provided file to surface liquidation opportunities to the liquidation server. diff --git a/per_sdk/protocols/token_vault_monitor.py b/per_sdk/protocols/token_vault_monitor.py index 2f80e284..dffa5865 100644 --- a/per_sdk/protocols/token_vault_monitor.py +++ b/per_sdk/protocols/token_vault_monitor.py @@ -9,13 +9,23 @@ import httpx import web3 from eth_abi import encode +from pythclient.hermes import HermesClient, PriceFeed +from pythclient.price_feeds import Price -from per_sdk.utils.pyth_prices import PriceFeed, PriceFeedClient, price_to_tuple from per_sdk.utils.types_liquidation_adapter import LiquidationOpportunity logger = logging.getLogger(__name__) +def price_to_tuple(price: Price): + return ( + int(price.price), + int(price.conf), + int(price.expo), + int(price.publish_time), + ) + + class ProtocolAccount(TypedDict): """ ProtocolAccount is a TypedDict that represents an account/vault in the protocol. @@ -63,7 +73,10 @@ def __init__( self.token_vault = self.w3.eth.contract( address=contract_address, abi=get_vault_abi() ) - self.price_feed_client = PriceFeedClient([]) + + self.price_feed_client = HermesClient([]) + ws_call = self.price_feed_client.ws_pyth_prices() + asyncio.create_task(ws_call) async def get_accounts(self) -> list[ProtocolAccount]: """ @@ -132,7 +145,7 @@ def create_liquidation_opp( for update in prices: feed_id = bytes.fromhex(update["feed_id"]) price = price_to_tuple(update["price"]) - price_ema = price_to_tuple(update["price_ema"]) + price_ema = price_to_tuple(update["ema_price"]) prev_publish_time = 0 price_updates.append( encode( @@ -196,13 +209,14 @@ def create_liquidation_opp( return opp - async def get_liquidation_opportunities(self) -> list[LiquidationOpportunity]: + async def get_liquidation_opportunities( + self, use_ws: bool = False + ) -> list[LiquidationOpportunity]: """ Filters list of ProtocolAccount types to return a list of LiquidationOpportunity types. Args: - accounts: A list of ProtocolAccount objects, representing all the open accounts in the protocol. - prices: A dictionary of Pyth price feeds, where the keys are Pyth feed IDs and the values are PriceFeed objects. + ws: A boolean indicating whether to use the websocket to get price updates. Returns: A list of LiquidationOpportunity objects, one per account that is eligible for liquidation. """ @@ -210,16 +224,49 @@ async def get_liquidation_opportunities(self) -> list[LiquidationOpportunity]: liquidatable = [] accounts = await self.get_accounts() for account in accounts: - # vault is already liquidated + # vault is already liquidated--can skip if account["amount_collateral"] == 0 and account["amount_debt"] == 0: continue - # TODO: optimize this to only query for the price feeds that are needed and only query once - ( - price_collateral, - price_debt, - ) = await self.price_feed_client.get_pyth_prices_latest( - [account["token_id_collateral"], account["token_id_debt"]] - ) + + token_id_collateral = account["token_id_collateral"] + token_id_debt = account["token_id_debt"] + + price_collateral_feed = None + price_debt_feed = None + + if use_ws: + if token_id_collateral not in ( + self.price_feed_client.feed_ids + + self.price_feed_client.pending_feed_ids + ): + self.price_feed_client.add_feed_ids([token_id_collateral]) + if token_id_debt not in ( + self.price_feed_client.feed_ids + + self.price_feed_client.pending_feed_ids + ): + self.price_feed_client.add_feed_ids([token_id_debt]) + + price_collateral_feed = self.price_feed_client.prices_dict.get( + token_id_collateral + ) + price_debt_feed = self.price_feed_client.prices_dict.get(token_id_debt) + + # get price of collateral asset from http request if doesn't return from ws or ws is not used + if price_collateral_feed is None: + (price_collateral_feed,) = ( + await self.price_feed_client.get_pyth_prices_latest( + [token_id_collateral] + ) + ) + price_collateral = price_collateral_feed.get("price") + + # get price of debt asset from http request if doesn't return from ws or ws is not used + if price_debt_feed is None: + (price_debt_feed,) = ( + await self.price_feed_client.get_pyth_prices_latest([token_id_debt]) + ) + price_debt = price_debt_feed.get("price") + if price_collateral is None: raise Exception( f"Price for collateral token {account['token_id_collateral']} not found" @@ -231,16 +278,20 @@ async def get_liquidation_opportunities(self) -> list[LiquidationOpportunity]: ) value_collateral = ( - int(price_collateral["price"]["price"]) * account["amount_collateral"] + int(price_collateral.price) * account["amount_collateral"] + ) + value_debt = int(price_debt.price) * account["amount_debt"] + logger.debug( + f"Account {account['account_number']} health: {value_collateral / value_debt}" ) - value_debt = int(price_debt["price"]["price"]) * account["amount_debt"] - health = value_collateral / value_debt - logger.debug(f"Account {account['account_number']} health: {health}") if ( value_debt * int(account["min_health_ratio"]) > value_collateral * 10**18 ): - price_updates = [price_collateral, price_debt] + price_updates = [ + price_collateral_feed, + price_debt_feed, + ] liquidatable.append(self.create_liquidation_opp(account, price_updates)) return liquidatable @@ -301,6 +352,13 @@ async def main(): type=str, help="Liquidation server endpoint; if provided, will send liquidation opportunities to this endpoint", ) + parser.add_argument( + "--use-ws", + action="store_true", + dest="use_ws", + default=False, + help="If provided, will use the websocket to get price updates", + ) args = parser.parse_args() logger.setLevel(logging.INFO if args.verbose == 0 else logging.DEBUG) @@ -322,7 +380,7 @@ async def main(): ) while True: - opportunities = await monitor.get_liquidation_opportunities() + opportunities = await monitor.get_liquidation_opportunities(use_ws=args.use_ws) if args.broadcast: client = httpx.AsyncClient() diff --git a/per_sdk/utils/pyth_prices.py b/per_sdk/utils/pyth_prices.py deleted file mode 100644 index d3727eb2..00000000 --- a/per_sdk/utils/pyth_prices.py +++ /dev/null @@ -1,171 +0,0 @@ -import asyncio -from typing import TypedDict - -import httpx - -HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/api/" -HERMES_ENDPOINT_WSS = "wss://hermes.pyth.network/ws" - - -class Price(TypedDict): - price: str - conf: str - expo: int - publish_time: int - - -class PriceFeed(TypedDict): - feed_id: str - price: Price - ema_price: Price - vaa: str - - -def price_to_tuple(price: Price): - return ( - int(price["price"]), - int(price["conf"]), - int(price["expo"]), - int(price["publish_time"]), - ) - - -async def get_price_feed_ids() -> list[str]: - """ - Queries the Hermes https endpoint for a list of the IDs of all Pyth price feeds. - """ - - url = HERMES_ENDPOINT_HTTPS + "price_feed_ids" - client = httpx.AsyncClient() - - data = (await client.get(url)).json() - - return data - - -class PriceFeedClient: - def __init__(self, feed_ids: list[str]): - self.feed_ids = feed_ids - self.pending_feed_ids = feed_ids - self.prices_dict: dict[str, PriceFeed] = {} - self.client = httpx.AsyncClient() - - def add_feed_ids(self, feed_ids: list[str]): - self.feed_ids += feed_ids - self.feed_ids = list(set(self.feed_ids)) - self.pending_feed_ids += feed_ids - - def extract_price_feed(self, data: dict) -> PriceFeed: - """ - Extracts a PriceFeed object from the JSON response from Hermes. - """ - price = data["price"] - price_ema = data["ema_price"] - vaa = data["vaa"] - price_feed = { - "feed_id": data["id"], - "price": price, - "price_ema": price_ema, - "vaa": vaa, - } - return price_feed - - async def get_pyth_prices_latest(self, feedIds: list[str]) -> list[PriceFeed]: - """ - Queries the Hermes https endpoint for the latest price feeds for a list of Pyth feed IDs. - """ - url = HERMES_ENDPOINT_HTTPS + "latest_price_feeds?" - params = {"ids[]": feedIds, "binary": "true"} - - data = (await self.client.get(url, params=params)).json() - - results = [] - for res in data: - price_feed = self.extract_price_feed(res) - results.append(price_feed) - - return results - - async def get_pyth_price_at_time(self, feed_id: str, timestamp: int) -> PriceFeed: - """ - Queries the Hermes https endpoint for the price feed for a Pyth feed ID at a given timestamp. - """ - url = HERMES_ENDPOINT_HTTPS + "get_price_feed" - params = {"id": feed_id, "publish_time": timestamp, "binary": "true"} - - data = (await self.client.get(url, params=params)).json() - - price_feed = self.extract_price_feed(data) - - return price_feed - - async def get_all_prices(self) -> dict[str, PriceFeed]: - """ - Queries the Hermes http endpoint for the latest price feeds for all feed IDs in the class object. - - There are limitations on the number of feed IDs that can be queried at once, so this function queries the feed IDs in batches. - """ - pyth_prices_latest = [] - i = 0 - batch_size = 100 - while len(self.feed_ids[i : i + batch_size]) > 0: - pyth_prices_latest += await self.get_pyth_prices_latest( - self.feed_ids[i : i + batch_size] - ) - i += batch_size - - return dict(pyth_prices_latest) - - async def ws_pyth_prices(self): - """ - Opens a websocket connection to Hermes for latest prices for all feed IDs in the class object. - """ - import json - - import websockets - - async with websockets.connect(HERMES_ENDPOINT_WSS) as ws: - while True: - if len(self.pending_feed_ids) > 0: - json_subscribe = { - "ids": self.pending_feed_ids, - "type": "subscribe", - "verbose": True, - "binary": True, - } - await ws.send(json.dumps(json_subscribe)) - self.pending_feed_ids = [] - - msg = json.loads(await ws.recv()) - if msg["type"] == "response": - if msg["status"] != "success": - raise Exception("Error in subscribing to websocket") - try: - if msg["type"] != "price_update": - continue - - feed_id = msg["price_feed"]["id"] - new_feed = msg["price_feed"] - - self.prices_dict[feed_id] = new_feed - - except: - raise Exception("Error in price_update message", msg) - - -async def main(): - feed_ids = await get_price_feed_ids() - # TODO: remove this line, once rate limits are figured out - feed_ids = feed_ids[:1] - price_feed_client = PriceFeedClient(feed_ids) - - print("Starting web socket...") - ws_call = price_feed_client.ws_pyth_prices() - asyncio.create_task(ws_call) - - while True: - await asyncio.sleep(1) - - -if __name__ == "__main__": - asyncio.run(main())