From 1919aa721d88ed90bccfc9ca4bac113a200e5d3d Mon Sep 17 00:00:00 2001 From: Anirudh Suresh Date: Wed, 17 Jan 2024 14:23:32 +0000 Subject: [PATCH] cli, ws, doc --- auction-server/src/api/rest.rs | 29 ++-- beacon/README.md | 2 +- beacon/protocols/beacon_TokenVault.py | 46 ++++-- beacon/protocols/beacon_template.py | 49 +++--- beacon/searcher/README.md | 2 +- beacon/searcher/searcherA.py | 47 ++++-- beacon/searcher/searcher_template.py | 2 +- beacon/surface_opportunities.py | 50 ------ beacon/utils/pyth_prices.py | 209 ++++++++++++++------------ 9 files changed, 223 insertions(+), 213 deletions(-) delete mode 100644 beacon/surface_opportunities.py diff --git a/auction-server/src/api/rest.rs b/auction-server/src/api/rest.rs index 6861d30f..bac296da 100644 --- a/auction-server/src/api/rest.rs +++ b/auction-server/src/api/rest.rs @@ -180,7 +180,7 @@ pub async fn surface( /// Get liquidation opportunities /// // #[axum_macros::debug_handler] -#[utoipa::path(get, path = "/getOpps", +#[utoipa::path(get, path = "/getOpps", params( ("chain_id" = String, Query, description = "Chain ID to retrieve opportunities for"), ("contract" = Option, Query, description = "Contract address to filter by") @@ -192,11 +192,11 @@ pub async fn surface( ,)] pub async fn get_opps( State(store): State>, - Query(params): Query, -) -> Result>, RestError> { + Query(params): Query +) -> Result>, RestError> { let chain_id = params.chain_id; let contract = params.contract; - + let chain_store = store .chains .get(&chain_id) @@ -210,28 +210,17 @@ pub async fn get_opps( .parse::
() .map_err(|_| RestError::BadParameters("Invalid contract address".to_string()))?; - opps = chain_store - .opps - .write() - .await - .entry(key) - .or_default() - .to_vec(); - } + opps = chain_store.opps.write().await.entry(key).or_default().to_vec(); + }, None => { + // TODO: fix this double write access, to make this work let opps_dict = chain_store.opps.write().await; for key in opps_dict.keys() { - let opps_key = chain_store - .opps - .write() - .await - .entry(key.clone()) - .or_default() - .clone(); + let opps_key = chain_store.opps.write().await.entry(key.clone()).or_default().clone(); opps.extend(opps_key); } } } Ok(Json(opps)) -} +} \ No newline at end of file diff --git a/beacon/README.md b/beacon/README.md index b65919c9..822f373b 100644 --- a/beacon/README.md +++ b/beacon/README.md @@ -6,4 +6,4 @@ The LiquidationAdapter contract that is part of the Express Relay on-chain stack Each protocol that integrates with Express Relay and the LiquidationAdapter workflow must provide code that handles getting liquidatable accounts; the template and example files for this are found in `/protocols`. Some common types are defined in `utils/types_liquidation_adapter.py`, and standard functions for accessing Pyth Hermes prices can be found in `utils/pyth_prices.py`. -The party that runs the beacon can add the protocol-provided modules for getting liquidatable accounts to `surface_opportunities.py`. Running that file should surface opportunities to the beacon web server. \ No newline at end of file +The party that runs the beacon can run the protocol-provided file to get and surface liquidatable accounts to the Beacon server. diff --git a/beacon/protocols/beacon_TokenVault.py b/beacon/protocols/beacon_TokenVault.py index a0e2bee1..d54871f4 100644 --- a/beacon/protocols/beacon_TokenVault.py +++ b/beacon/protocols/beacon_TokenVault.py @@ -2,6 +2,7 @@ from eth_abi import encode import json from typing import TypedDict +import argparse from beacon.utils.pyth_prices import * from beacon.utils.types_liquidation_adapter import * @@ -143,23 +144,38 @@ def get_liquidatable(accounts: list[LiquidationAccount], async def main(): - # get all accounts - accounts = await get_accounts() + parser = argparse.ArgumentParser() + parser.add_argument("--operator_api_key", type=str, required=True, help="Operator API key, used to authenticate the surface post request") + parser.add_argument("--rpc_url", type=str, required=True, help="Chain RPC endpoint, used to fetch on-chain data via get_accounts") + parser.add_argument("--beacon_server_url", type=str, help="Beacon server endpoint; if provided, will send liquidation opportunities to the beacon server; otherwise, will just print them out") + args = parser.parse_args() # get prices - pyth_price_feed_ids = await get_price_feed_ids() - pyth_prices_latest = [] - i = 0 - cntr = 100 - while len(pyth_price_feed_ids[i:i + cntr]) > 0: - pyth_prices_latest += await get_pyth_prices_latest(pyth_price_feed_ids[i:i + cntr]) - i += cntr - pyth_prices_latest = dict(pyth_prices_latest) - - # get liquidatable accounts - liquidatable = get_liquidatable(accounts, pyth_prices_latest) - - print(liquidatable) + feed_ids = ["ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"] # TODO: should this be automated rather than hardcoded? + price_feed_client = PriceFeedClient(feed_ids) + + ws_call = price_feed_client.ws_pyth_prices() + task = asyncio.create_task(ws_call) + + client = httpx.AsyncClient() + + await asyncio.sleep(2) + + while True: + # get all accounts + accounts = await get_accounts(args.rpc_url) + + liquidatable = get_liquidatable(accounts, price_feed_client.prices_dict) + + if args.beacon_server_url: + resp = await client.post( + args.beacon_server_url, + json=liquidatable + ) + print(f"Response, post to beacon: {resp.text}") + else: + print(liquidatable) + await asyncio.sleep(2) if __name__ == "__main__": asyncio.run(main()) diff --git a/beacon/protocols/beacon_template.py b/beacon/protocols/beacon_template.py index 8b860719..2d89df27 100644 --- a/beacon/protocols/beacon_template.py +++ b/beacon/protocols/beacon_template.py @@ -2,6 +2,7 @@ from eth_abi import encode import json from typing import TypedDict +import argparse from beacon.utils.pyth_prices import * from beacon.utils.types_liquidation_adapter import * @@ -78,25 +79,39 @@ async def main(): """ main is a good mechanism to check if your implementations of the functions above are working properly. """ - - # get all accounts - accounts = await get_accounts() + parser = argparse.ArgumentParser() + parser.add_argument("--operator_api_key", type=str, required=True, help="Operator API key, used to authenticate the surface post request") + parser.add_argument("--rpc_url", type=str, required=True, help="Chain RPC endpoint, used to fetch on-chain data via get_accounts") + parser.add_argument("--beacon_server_url", type=str, help="Beacon server endpoint; if provided, will send liquidation opportunities to the beacon server; otherwise, will just print them out") + args = parser.parse_args() # get prices - pyth_price_feed_ids = await get_price_feed_ids() - pyth_prices_latest = [] - i = 0 - cntr = 100 - while len(pyth_price_feed_ids[i:i + cntr]) > 0: - pyth_prices_latest += await get_pyth_prices_latest(pyth_price_feed_ids[i:i + cntr]) - i += cntr - pyth_prices_latest = dict(pyth_prices_latest) - - # get liquidatable accounts - liquidatable = get_liquidatable( - accounts, pyth_prices_latest) - - print(liquidatable) + feed_ids = [] # TODO: specify initial price feeds to subscribe to + price_feed_client = PriceFeedClient(feed_ids) + + ws_call = price_feed_client.ws_pyth_prices() + task = asyncio.create_task(ws_call) + + client = httpx.AsyncClient() + + await asyncio.sleep(2) + + while True: + # get all accounts + accounts = await get_accounts(args.rpc_url) + + liquidatable = get_liquidatable(accounts, price_feed_client.prices_dict) + + if args.beacon_server_url: + # this post request will not work without an operator API key; however, this should work fine if get_liquidatable returns the correct type + resp = await client.post( + args.beacon_server_url, + json=liquidatable + ) + print(f"Response, post to beacon: {resp.text}") + else: + print(liquidatable) + await asyncio.sleep(2) if __name__ == "__main__": asyncio.run(main()) diff --git a/beacon/searcher/README.md b/beacon/searcher/README.md index 1f782152..124ca118 100644 --- a/beacon/searcher/README.md +++ b/beacon/searcher/README.md @@ -7,4 +7,4 @@ Searchers can integrate with Express Relay by one of two means: Option 2 requires bespoke work to handle individual protocol interfaces and smart contract risk, and it is similar in nature to how many searchers currently do liquidations via their own deployed contracts--searchers can now call into their smart contracts via the Express Relay workflow. This option allows for greater customization by the searcher, but requires bespoke work per protocol that the searcher wants to integrate with. -Meanwhile, option 1 requires much less bespoke work and does not require contract deployment by the searcher. For option 1, the searcher submits liquidation transactions to the LiquidationAdapter contract, which handles routing the liquidation logic to the protocol and also performs some basic safety checks to ensure that the searcher is paying and receiving the appropriate amounts. The searcher can submit transactions signed by their EOA that has custody of the tokens they wish to repay with. Searchers can listen to liquidation opportunities at the Beacon server, and if they wish to submit a liquidation transaction through Express Relay, they can submit it to the auction server endpoint. `searcher_template.py` contains a template for the actions that a searcher may wish to perform, namely getting and assessing opportunities at the Beacon server and constructing and sending a liquidation. Helper functions related to constructing the signature for the LiquidationAdapter contract are in `searcher_utils.py`. A sample workflow is in `searcherA.py` (note: this example lacks any serious evaluation of opportunities, and it simply carries out a liquidation if the opportunity is available). \ No newline at end of file +Meanwhile, option 1 requires much less bespoke work and does not require contract deployment by the searcher. For option 1, the searcher submits liquidation transactions to the LiquidationAdapter contract, which handles routing the liquidation logic to the protocol and also performs some basic safety checks to ensure that the searcher is paying and receiving the appropriate amounts. The searcher can submit transactions signed by their EOA that has custody of the tokens they wish to repay with. Searchers can listen to liquidation opportunities at the Beacon server, and if they wish to submit a liquidation transaction through Express Relay, they can submit it to the auction server endpoint. `searcher_template.py` contains a template for the actions that a searcher may wish to perform, namely getting and assessing opportunities at the Beacon server and constructing and sending a liquidation. Helper functions related to constructing the signature for the LiquidationAdapter contract are in `searcher_utils.py`. A sample workflow is in `searcherA.py` (note: this example lacks any serious evaluation of opportunities, and it simply carries out a liquidation if the opportunity is available). diff --git a/beacon/searcher/searcherA.py b/beacon/searcher/searcherA.py index fb9a1fe9..ba0cec73 100644 --- a/beacon/searcher/searcherA.py +++ b/beacon/searcher/searcherA.py @@ -12,7 +12,20 @@ BID = 10 VALID_UNTIL = 1_000_000_000_000 +CONTRACT_ADDRESS = "0x72A22FfcAfa6684d4EE449620270ac05afE963d0" +class UserLiquidationParams(TypedDict): + bid: int + valid_until: int + +def assess_liquidation_opportunity( + opp: LiquidationOpportunity +) -> UserLiquidationParams | None: + user_liquidation_params: UserLiquidationParams = { + "bid": BID, + "valid_until": VALID_UNTIL + } + return user_liquidation_params def create_liquidation_transaction( opp: LiquidationOpportunity, @@ -59,26 +72,34 @@ def create_liquidation_transaction( async def main(): - CLIENT = httpx.AsyncClient() + client = httpx.AsyncClient() - params = {"chain_id": "development"} - - liquidatable = (await CLIENT.get(BEACON_SERVER_ENDPOINT_GETOPPS, params=params)).json() + params = {"chain_id": "development", "contract": CONTRACT_ADDRESS} # this is hardcoded to the searcher A SK sk_liquidator = "0x5b1efe5da513271c0d30cde7a2ad1d29456d68abd592efdaa7d2302e913b783f" - tx = create_liquidation_transaction( - liquidatable[0], sk_liquidator, VALID_UNTIL, BID) - resp = await CLIENT.post( - AUCTION_SERVER_ENDPOINT, - json=tx - ) + while True: + liquidatable = (await client.get(BEACON_SERVER_ENDPOINT_GETOPPS, params=params)).json() + + for liquidation_opp in liquidatable: + user_liquidation_params = assess_liquidation_opportunity(liquidation_opp) + + if user_liquidation_params is not None: + bid, valid_until = user_liquidation_params["bid"], user_liquidation_params["valid_until"] + + tx = create_liquidation_transaction( + liquidation_opp, sk_liquidator, valid_until, bid) + + resp = await client.post( + AUCTION_SERVER_ENDPOINT, + json=tx + ) - print(resp.text) + print(resp.text) - import pdb - pdb.set_trace() + import pdb + pdb.set_trace() if __name__ == "__main__": asyncio.run(main()) diff --git a/beacon/searcher/searcher_template.py b/beacon/searcher/searcher_template.py index 68a4e190..3049589c 100644 --- a/beacon/searcher/searcher_template.py +++ b/beacon/searcher/searcher_template.py @@ -1,7 +1,7 @@ from beacon.utils.types_liquidation_adapter import * def assess_liquidation_opportunity( - account: LiquidationOpportunity + opp: LiquidationOpportunity ) -> (int, int) | None: """ Assesses whether a LiquidationOpportunity is worth liquidating; if so, returns a tuple of (bid, valid_until) diff --git a/beacon/surface_opportunities.py b/beacon/surface_opportunities.py deleted file mode 100644 index 2dbb42f7..00000000 --- a/beacon/surface_opportunities.py +++ /dev/null @@ -1,50 +0,0 @@ -import httpx -import asyncio -import argparse -import os - -from beacon.protocols import beacon_TokenVault -from beacon.utils.pyth_prices import * -from beacon.utils.endpoints import * - -PROTOCOLS = [beacon_TokenVault] - - -# TODO: turn on authorization in the surface post requests -async def main(operator_api_key: str, rpc_url: str): - # get prices - pyth_price_feed_ids = await get_price_feed_ids() - pyth_prices_latest = [] - i = 0 - cntr = 100 - while len(pyth_price_feed_ids[i:i + cntr]) > 0: - pyth_prices_latest += await get_pyth_prices_latest(pyth_price_feed_ids[i:i + cntr]) - i += cntr - pyth_prices_latest = dict(pyth_prices_latest) - - liquidatable = [] - - for protocol in PROTOCOLS: - accounts = await protocol.get_accounts(rpc_url) - - liquidatable_protocol = protocol.get_liquidatable( - accounts, pyth_prices_latest) - - liquidatable += liquidatable_protocol - - CLIENT = httpx.AsyncClient() - - resp = await CLIENT.post( - f"{BEACON_SERVER_ENDPOINT_SURFACE}", - json=liquidatable - ) - - print(f"Response PER post: {resp.text}") - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--operator_api_key", type=str, required=True, help="Operator API key, used to authenticate the surface post request") - parser.add_argument("--rpc_url", type=str, required=True, help="Chain RPC endpoint, used to fetch on-chain data via get_accounts") - args = parser.parse_args() - - asyncio.run(main(args.operator_api_key, args.rpc_url)) diff --git a/beacon/utils/pyth_prices.py b/beacon/utils/pyth_prices.py index 81901ace..ce7726a2 100644 --- a/beacon/utils/pyth_prices.py +++ b/beacon/utils/pyth_prices.py @@ -9,120 +9,139 @@ class Price(TypedDict): - price: int - conf: int + price: str + conf: str expo: int publish_time: int class PriceFeed(TypedDict): - feed_id: str + id: str price: Price - price_ema: Price + ema_price: Price vaa: str -CLIENT = httpx.AsyncClient() - - -def extract_price_feed(data: dict) -> PriceFeed: - price: Price = data['price'] - price_ema: Price = data['ema_price'] - vaa = data['vaa'] - price_feed: PriceFeed = { - "feed_id": data['id'], - "price": price, - "price_ema": price_ema, - "vaa": vaa - } - return price_feed - async def get_price_feed_ids() -> list[str]: url = HERMES_ENDPOINT + "price_feed_ids" - - data = (await CLIENT.get(url)).json() + client = httpx.AsyncClient() + + data = (await client.get(url)).json() return data -async def get_pyth_prices_latest( - feedIds: list[str] -) -> list[tuple[str, PriceFeed]]: - url = HERMES_ENDPOINT + "latest_price_feeds?" - params = {"ids[]": feedIds, "binary": "true"} - - data = (await CLIENT.get(url, params=params)).json() - - results = [] - for res in data: - price_feed = extract_price_feed(res) - results.append((res['id'], price_feed)) - - return results - - -async def get_pyth_price_at_time( - feed_id: str, - timestamp: int -) -> tuple[str, PriceFeed]: - url = HERMES_ENDPOINT + f"get_price_feed" - params = {"id": feed_id, "publish_time": timestamp, "binary": "true"} - - data = (await CLIENT.get(url, params=params)).json() - - price_feed = extract_price_feed(data) - - return (feed_id, price_feed) - - -async def get_all_prices() -> dict[str, PriceFeed]: - pyth_price_feed_ids = await get_price_feed_ids() - - pyth_prices_latest = [] - i = 0 - cntr = 100 - while len(pyth_price_feed_ids[i:i + cntr]) > 0: - pyth_prices_latest += await get_pyth_prices_latest(pyth_price_feed_ids[i:i + cntr]) - i += cntr - - return dict(pyth_prices_latest) - - -async def ws_pyth_prices(feed_ids: list[str]): - url_ws = "wss://hermes.pyth.network/ws" - - json_subscribe = { - "ids": feed_ids, - "type": "subscribe", - "verbose": True, - "binary": True - } - - async with websockets.connect(url_ws) as ws: - await ws.send(json.dumps(json_subscribe)) - while True: - msg = json.loads(await ws.recv()) - try: - print("0x"+msg["price_feed"]["id"], msg["price_feed"]["price"]["price"], msg["price_feed"]["price"]["publish_time"]) - except: - print(f"couldn't parse msg, {msg}") - +class PriceFeedClient: + def __init__(self, feed_ids: list[str]): + self.feed_ids = feed_ids + self.pending_feed_ids = feed_ids + self.prices_dict: dict[str, PriceFeed] = {} + self.client = httpx.AsyncClient() + + def add_feed_ids(self, feed_ids: list[str]): + self.feed_ids += feed_ids + self.feed_ids = list(set(self.feed_ids)) + self.pending_feed_ids += feed_ids + + def extract_price_feed(self, data: dict) -> PriceFeed: + price: Price = data['price'] + price_ema: Price = data['ema_price'] + vaa = data['vaa'] + price_feed: PriceFeed = { + "feed_id": data['id'], + "price": price, + "price_ema": price_ema, + "vaa": vaa + } + return price_feed + + async def get_pyth_prices_latest( + self, + feedIds: list[str] + ) -> list[tuple[str, PriceFeed]]: + url = HERMES_ENDPOINT + "latest_price_feeds?" + params = {"ids[]": feedIds, "binary": "true"} + + data = (await self.client.get(url, params=params)).json() + + results = [] + for res in data: + price_feed = self.extract_price_feed(res) + results.append((res['id'], price_feed)) + + return results + + + async def get_pyth_price_at_time( + self, + feed_id: str, + timestamp: int + ) -> tuple[str, PriceFeed]: + url = HERMES_ENDPOINT + f"get_price_feed" + params = {"id": feed_id, "publish_time": timestamp, "binary": "true"} + + data = (await self.client.get(url, params=params)).json() + + price_feed = self.extract_price_feed(data) + + return (feed_id, price_feed) + + + async def get_all_prices(self) -> dict[str, PriceFeed]: + pyth_prices_latest = [] + i = 0 + cntr = 100 + while len(self.feed_ids[i:i + cntr]) > 0: + pyth_prices_latest += await self.get_pyth_prices_latest(self.feed_ids[i:i + cntr]) + i += cntr + + return dict(pyth_prices_latest) + + async def ws_pyth_prices(self): + url_ws = "wss://hermes.pyth.network/ws" + + async with websockets.connect(url_ws) as ws: + while True: + if len(self.pending_feed_ids) > 0: + json_subscribe = { + "ids": self.pending_feed_ids, + "type": "subscribe", + "verbose": True, + "binary": True + } + await ws.send(json.dumps(json_subscribe)) + self.pending_feed_ids = [] + + msg = json.loads(await ws.recv()) + try: + if msg["type"] != "price_update": + continue + + feed_id = msg["price_feed"]["id"] + new_feed = msg["price_feed"] + + self.prices_dict[feed_id] = new_feed + + except: + raise Exception("Error in price_update message", msg) + + + + async def main(): - pyth_price = await get_pyth_price_at_time("0xff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", 1703016621) + feed_ids = await get_price_feed_ids() + feed_ids = feed_ids[:1] + price_feed_client = PriceFeedClient(feed_ids) - data = await get_all_prices() + print("Starting...") + ws_call = price_feed_client.ws_pyth_prices() + task = asyncio.create_task(ws_call) - return pyth_price, data + # Can insert continuous loop to check vaults + while True: + await asyncio.sleep(1) if __name__ == "__main__": - pyth_price, data = asyncio.run(main()) - - # feedIds = asyncio.run(get_price_feed_ids()) - # loop = asyncio.new_event_loop() - # asyncio.set_event_loop(loop) - # asyncio.get_event_loop().run_until_complete(ws_pyth_prices(feedIds)) - - import pdb - pdb.set_trace() + asyncio.run(main()) \ No newline at end of file