From 6181113760c6011d9e644873e9fa554e6dacd7fd Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Thu, 15 Feb 2024 15:24:29 +0100 Subject: [PATCH 1/6] First version of websocket support --- auction-server/Cargo.lock | 14 ++ auction-server/Cargo.toml | 1 + auction-server/src/api.rs | 14 +- auction-server/src/api/liquidation.rs | 46 ++-- auction-server/src/api/ws.rs | 331 ++++++++++++++++++++++++++ auction-server/src/state.rs | 10 +- 6 files changed, 395 insertions(+), 21 deletions(-) create mode 100644 auction-server/src/api/ws.rs diff --git a/auction-server/Cargo.lock b/auction-server/Cargo.lock index a814317c..103dc091 100644 --- a/auction-server/Cargo.lock +++ b/auction-server/Cargo.lock @@ -170,6 +170,7 @@ dependencies = [ "axum-macros", "axum-streams", "clap", + "dashmap", "ethers", "futures", "serde", @@ -739,6 +740,19 @@ dependencies = [ "cipher", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.5.0" diff --git a/auction-server/Cargo.toml b/auction-server/Cargo.toml index ffd304f8..3aa89106 100644 --- a/auction-server/Cargo.toml +++ b/auction-server/Cargo.toml @@ -25,3 +25,4 @@ utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] } serde_yaml = "0.9.25" ethers = "2.0.10" axum-macros = "0.4.0" +dashmap = { version = "5.4.0" } diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index b3a5f26d..48c79217 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -7,7 +7,7 @@ use { }, liquidation::{ OpportunityBid, - OpportunityParamsWithId, + OpportunityParamsWithMetadata, }, }, auction::run_submission_loop, @@ -44,6 +44,7 @@ use { Router, }, clap::crate_version, + dashmap::DashMap, ethers::{ providers::{ Http, @@ -63,6 +64,7 @@ use { sync::{ atomic::{ AtomicBool, + AtomicUsize, Ordering, }, Arc, @@ -92,6 +94,7 @@ async fn root() -> String { mod bid; pub(crate) mod liquidation; +pub(crate) mod ws; pub enum RestError { /// The request contained invalid parameters @@ -177,12 +180,12 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { schemas(OpportunityParamsV1), schemas(OpportunityBid), schemas(OpportunityParams), - schemas(OpportunityParamsWithId), + schemas(OpportunityParamsWithMetadata), schemas(TokenQty), schemas(BidResult), schemas(ErrorBodyResponse), responses(ErrorBodyResponse), - responses(OpportunityParamsWithId), + responses(OpportunityParamsWithMetadata), responses(BidResult) ), tags( @@ -235,6 +238,10 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { chains: chain_store?, liquidation_store: LiquidationStore::default(), per_operator: wallet, + ws: ws::WsState { + subscriber_counter: AtomicUsize::new(0), + subscribers: DashMap::new(), + }, }); let server_store = store.clone(); @@ -258,6 +265,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { "/v1/liquidation/opportunities/:opportunity_id/bids", post(liquidation::post_bid), ) + .route("/v1/ws", get(ws::ws_route_handler)) .route("/live", get(live)) .layer(CorsLayer::permissive()) .with_state(server_store); diff --git a/auction-server/src/api/liquidation.rs b/auction-server/src/api/liquidation.rs index f8945e29..b3e0fe1e 100644 --- a/auction-server/src/api/liquidation.rs +++ b/auction-server/src/api/liquidation.rs @@ -5,6 +5,10 @@ use { handle_bid, BidResult, }, + ws::{ + notify_updates, + UpdateEvent::NewOpportunity, + }, ErrorBodyResponse, RestError, }, @@ -60,19 +64,31 @@ use { /// Similar to OpportunityParams, but with the opportunity id included. #[derive(Serialize, Deserialize, ToSchema, Clone, ToResponse)] -pub struct OpportunityParamsWithId { +pub struct OpportunityParamsWithMetadata { /// The opportunity unique id #[schema(example = "f47ac10b-58cc-4372-a567-0e02b2c3d479", value_type=String)] opportunity_id: Uuid, + /// Creation time of the opportunity + #[schema(example = "1700000000")] + creation_time: UnixTimestamp, /// opportunity data #[serde(flatten)] params: OpportunityParams, } -impl Into for LiquidationOpportunity { - fn into(self) -> OpportunityParamsWithId { - OpportunityParamsWithId { +impl OpportunityParamsWithMetadata { + pub fn get_chain_id(&self) -> &ChainId { + match &self.params { + OpportunityParams::V1(params) => ¶ms.chain_id, + } + } +} + +impl Into for LiquidationOpportunity { + fn into(self) -> OpportunityParamsWithMetadata { + OpportunityParamsWithMetadata { opportunity_id: self.id, + creation_time: self.creation_time, params: self.params, } } @@ -90,7 +106,7 @@ impl Into for LiquidationOpportunity { pub async fn post_opportunity( State(store): State>, Json(versioned_params): Json, -) -> Result, RestError> { +) -> Result, RestError> { let params = match versioned_params.clone() { OpportunityParams::V1(params) => params, }; @@ -127,22 +143,22 @@ pub async fn post_opportunity( } } - opportunities_existing.push(opportunity); + opportunities_existing.push(opportunity.clone()); } else { - write_lock.insert(params.permission_key.clone(), vec![opportunity]); + write_lock.insert(params.permission_key.clone(), vec![opportunity.clone()]); } + notify_updates(&store.ws, NewOpportunity(opportunity.clone().into())).await; + tracing::debug!("number of permission keys: {}", write_lock.len()); tracing::debug!( "number of opportunities for key: {}", write_lock[¶ms.permission_key].len() ); - Ok(OpportunityParamsWithId { - opportunity_id: id, - params: versioned_params, - } - .into()) + let opportunity_with_metadata: OpportunityParamsWithMetadata = opportunity.into(); + + Ok(opportunity_with_metadata.into()) } @@ -162,8 +178,8 @@ params(ChainIdQueryParams))] pub async fn get_opportunities( State(store): State>, query_params: Query, -) -> Result>, RestError> { - let opportunities: Vec = store +) -> Result>, RestError> { + let opportunities: Vec = store .liquidation_store .opportunities .read() @@ -177,7 +193,7 @@ pub async fn get_opportunities( .clone() .into() }) - .filter(|params_with_id: &OpportunityParamsWithId| { + .filter(|params_with_id: &OpportunityParamsWithMetadata| { let params = match ¶ms_with_id.params { OpportunityParams::V1(params) => params, }; diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs new file mode 100644 index 00000000..b1f57e77 --- /dev/null +++ b/auction-server/src/api/ws.rs @@ -0,0 +1,331 @@ +use { + crate::{ + api::{ + liquidation::OpportunityParamsWithMetadata, + SHOULD_EXIT, + }, + config::ChainId, + state::{ + LiquidationOpportunity, + Store, + }, + }, + anyhow::{ + anyhow, + Result, + }, + axum::{ + extract::{ + ws::{ + Message, + WebSocket, + }, + State, + WebSocketUpgrade, + }, + http::HeaderMap, + response::IntoResponse, + }, + dashmap::DashMap, + ethers::types::Chain, + futures::{ + future::join_all, + stream::{ + SplitSink, + SplitStream, + }, + SinkExt, + StreamExt, + }, + serde::{ + Deserialize, + Serialize, + }, + std::{ + collections::HashSet, + sync::{ + atomic::{ + AtomicUsize, + Ordering, + }, + Arc, + }, + time::Duration, + }, + tokio::sync::mpsc, +}; + +pub struct WsState { + pub subscriber_counter: AtomicUsize, + pub subscribers: DashMap>, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(tag = "type")] +enum ClientMessage { + #[serde(rename = "subscribe")] + Subscribe { chain_ids: Vec }, + #[serde(rename = "unsubscribe")] + Unsubscribe { chain_ids: Vec }, +} + +#[derive(Serialize, Clone)] +#[serde(tag = "type")] +enum ServerMessage { + #[serde(rename = "response")] + Response(ServerResponseMessage), + #[serde(rename = "new_opportunity")] + NewOpportunity { + opportunity: OpportunityParamsWithMetadata, + }, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(tag = "status")] +enum ServerResponseMessage { + #[serde(rename = "success")] + Success, + #[serde(rename = "error")] + Err { error: String }, +} + +pub async fn ws_route_handler( + ws: WebSocketUpgrade, + State(store): State>, +) -> impl IntoResponse { + ws.on_upgrade(move |socket| websocket_handler(socket, store)) +} + +async fn websocket_handler(stream: WebSocket, state: Arc) { + let ws_state = &state.ws; + let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); + let (notify_sender, notify_receiver) = mpsc::channel(NOTIFICATIONS_CHAN_LEN); + let (sender, receiver) = stream.split(); + ws_state.subscribers.insert(id, notify_sender); + let mut subscriber = Subscriber::new(id, state, notify_receiver, receiver, sender); + subscriber.run().await; +} + +#[derive(Clone)] +pub enum UpdateEvent { + NewOpportunity(OpportunityParamsWithMetadata), +} + +pub type SubscriberId = usize; + +/// Subscriber is an actor that handles a single websocket connection. +/// It listens to the store for updates and sends them to the client. +pub struct Subscriber { + id: SubscriberId, + closed: bool, + store: Arc, + notify_receiver: mpsc::Receiver, + receiver: SplitStream, + sender: SplitSink, + chain_ids: HashSet, + ping_interval: tokio::time::Interval, + exit_check_interval: tokio::time::Interval, + responded_to_ping: bool, +} + +const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); +const NOTIFICATIONS_CHAN_LEN: usize = 1000; + +impl Subscriber { + pub fn new( + id: SubscriberId, + store: Arc, + notify_receiver: mpsc::Receiver, + receiver: SplitStream, + sender: SplitSink, + ) -> Self { + Self { + id, + closed: false, + store, + notify_receiver, + receiver, + sender, + chain_ids: HashSet::new(), + ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), + exit_check_interval: tokio::time::interval(Duration::from_secs(5)), + responded_to_ping: true, // We start with true so we don't close the connection immediately + } + } + + #[tracing::instrument(skip(self))] + pub async fn run(&mut self) { + while !self.closed { + if let Err(e) = self.handle_next().await { + tracing::debug!(subscriber = self.id, error = ?e, "Error Handling Subscriber Message."); + break; + } + } + } + + async fn handle_next(&mut self) -> Result<()> { + tokio::select! { + maybe_update_event = self.notify_receiver.recv() => { + match maybe_update_event { + Some(event) => self.handle_update(event).await, + None => Err(anyhow!("Update channel closed. This should never happen. Closing connection.")) + } + }, + maybe_message_or_err = self.receiver.next() => { + self.handle_client_message( + maybe_message_or_err.ok_or(anyhow!("Client channel is closed"))?? + ).await + }, + _ = self.ping_interval.tick() => { + if !self.responded_to_ping { + return Err(anyhow!("Subscriber did not respond to ping. Closing connection.")); + } + self.responded_to_ping = false; + self.sender.send(Message::Ping(vec![])).await?; + Ok(()) + }, + _ = self.exit_check_interval.tick() => { + if SHOULD_EXIT.load(Ordering::Acquire) { + self.sender.close().await?; + self.closed = true; + return Err(anyhow!("Application is shutting down. Closing connection.")); + } + Ok(()) + } + } + } + + async fn handle_update(&mut self, event: UpdateEvent) -> Result<()> { + match event.clone() { + UpdateEvent::NewOpportunity(opportunity) => { + if !self.chain_ids.contains(opportunity.get_chain_id()) { + // Irrelevant update + return Ok(()); + } + let message = + serde_json::to_string(&ServerMessage::NewOpportunity { opportunity })?; + self.sender.send(message.into()).await?; + } + } + + Ok(()) + } + + #[tracing::instrument(skip(self, message))] + async fn handle_client_message(&mut self, message: Message) -> Result<()> { + let maybe_client_message = match message { + Message::Close(_) => { + // Closing the connection. We don't remove it from the subscribers + // list, instead when the Subscriber struct is dropped the channel + // to subscribers list will be closed and it will eventually get + // removed. + tracing::trace!(id = self.id, "Subscriber Closed Connection."); + + // Send the close message to gracefully shut down the connection + // Otherwise the client might get an abnormal Websocket closure + // error. + self.sender.close().await?; + self.closed = true; + return Ok(()); + } + Message::Text(text) => serde_json::from_str::(&text), + Message::Binary(data) => serde_json::from_slice::(&data), + Message::Ping(_) => { + // Axum will send Pong automatically + return Ok(()); + } + Message::Pong(_) => { + self.responded_to_ping = true; + return Ok(()); + } + }; + + match maybe_client_message { + Err(e) => { + self.sender + .send( + serde_json::to_string(&ServerMessage::Response( + ServerResponseMessage::Err { + error: e.to_string(), + }, + ))? + .into(), + ) + .await?; + return Ok(()); + } + + Ok(ClientMessage::Subscribe { chain_ids }) => { + let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect(); + + let not_found_chain_ids: Vec<&ChainId> = chain_ids + .iter() + .filter(|chain_id| !available_chain_ids.contains(chain_id)) + .collect(); + + // If there is a single chain id that is not found, we don't subscribe to any of the + // asked correct chain ids and return an error to be more explicit and clear. + if !not_found_chain_ids.is_empty() { + self.sender + .send( + serde_json::to_string(&ServerMessage::Response( + ServerResponseMessage::Err { + error: format!( + "Chain id(s) with id(s) {:?} not found", + not_found_chain_ids + ), + }, + ))? + .into(), + ) + .await?; + return Ok(()); + } else { + self.chain_ids.extend(chain_ids.into_iter()); + } + } + Ok(ClientMessage::Unsubscribe { chain_ids }) => { + self.chain_ids + .retain(|chain_id| !chain_ids.contains(chain_id)); + } + } + + + self.sender + .send( + serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Success))? + .into(), + ) + .await?; + + Ok(()) + } +} + + +pub async fn notify_updates(ws_state: &WsState, event: UpdateEvent) { + let closed_subscribers: Vec> = + join_all(ws_state.subscribers.iter_mut().map(|subscriber| { + let event = event.clone(); + async move { + match subscriber.send(event).await { + Ok(_) => None, + Err(_) => { + // An error here indicates the channel is closed (which may happen either when the + // client has sent Message::Close or some other abrupt disconnection). We remove + // subscribers only when send fails so we can handle closure only once when we are + // able to see send() fail. + Some(*subscriber.key()) + } + } + } + })) + .await; + + // Remove closed_subscribers from ws_state + closed_subscribers.into_iter().for_each(|id| { + if let Some(id) = id { + ws_state.subscribers.remove(&id); + } + }); +} diff --git a/auction-server/src/state.rs b/auction-server/src/state.rs index 90ca2a20..475b1153 100644 --- a/auction-server/src/state.rs +++ b/auction-server/src/state.rs @@ -1,7 +1,10 @@ use { - crate::config::{ - ChainId, - EthereumConfig, + crate::{ + api::ws::WsState, + config::{ + ChainId, + EthereumConfig, + }, }, ethers::{ providers::{ @@ -119,4 +122,5 @@ pub struct Store { pub chains: HashMap, pub liquidation_store: LiquidationStore, pub per_operator: LocalWallet, + pub ws: WsState, } From fc6b8711b083b5f92fd78092c2e4fb882ea853f9 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Thu, 15 Feb 2024 17:55:17 +0100 Subject: [PATCH 2/6] Decouple opportunity updates from api and organize long running processes --- auction-server/src/api.rs | 105 +++++++++-------- auction-server/src/api/liquidation.rs | 16 ++- auction-server/src/api/ws.rs | 41 ++++++- auction-server/src/auction.rs | 131 ++++++++++++---------- auction-server/src/liquidation_adapter.rs | 62 +++++----- 5 files changed, 214 insertions(+), 141 deletions(-) diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index 48c79217..ee3a1791 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -9,6 +9,7 @@ use { OpportunityBid, OpportunityParamsWithMetadata, }, + ws::run_subscription_loop, }, auction::run_submission_loop, config::{ @@ -87,6 +88,7 @@ use { // shutdown signal to all running tasks. However, this is a bit more complicated to implement and // we don't rely on global state for anything else. pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); +pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1); async fn root() -> String { format!("PER Auction Server API {}", crate_version!()) @@ -107,8 +109,7 @@ pub enum RestError { SimulationError { result: Bytes, reason: String }, /// The order was not found OpportunityNotFound, - /// The server cannot currently communicate with the blockchain, so is not able to verify - /// which random values have been requested. + /// Internal error occurred during processing the request TemporarilyUnavailable, /// A catch-all error for all other types of errors that could occur during processing. Unknown, @@ -159,14 +160,8 @@ pub async fn live() -> Response { (StatusCode::OK, "OK").into_response() } -pub async fn start_server(run_options: RunOptions) -> Result<()> { - tokio::spawn(async move { - tracing::info!("Registered shutdown signal handler..."); - tokio::signal::ctrl_c().await.unwrap(); - tracing::info!("Shut down signal received, waiting for tasks..."); - SHOULD_EXIT.store(true, Ordering::Release); - }); +pub async fn start_api(run_options: RunOptions, store: Arc) -> Result<()> { #[derive(OpenApi)] #[openapi( paths( @@ -194,6 +189,48 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { )] struct ApiDoc; + let app: Router<()> = Router::new() + .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi())) + .route("/", get(root)) + .route("/v1/bids", post(bid::bid)) + .route( + "/v1/liquidation/opportunities", + post(liquidation::post_opportunity), + ) + .route( + "/v1/liquidation/opportunities", + get(liquidation::get_opportunities), + ) + .route( + "/v1/liquidation/opportunities/:opportunity_id/bids", + post(liquidation::post_bid), + ) + .route("/v1/ws", get(ws::ws_route_handler)) + .route("/live", get(live)) + .layer(CorsLayer::permissive()) + .with_state(store); + + axum::Server::bind(&run_options.server.listen_addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(async { + while !SHOULD_EXIT.load(Ordering::Acquire) { + tokio::time::sleep(EXIT_CHECK_INTERVAL).await; + } + tracing::info!("Shutting down RPC server..."); + }) + .await?; + Ok(()) +} + +pub async fn start_server(run_options: RunOptions) -> Result<()> { + tokio::spawn(async move { + tracing::info!("Registered shutdown signal handler..."); + tokio::signal::ctrl_c().await.unwrap(); + tracing::info!("Shut down signal received, waiting for tasks..."); + SHOULD_EXIT.store(true, Ordering::Release); + }); + + let config = Config::load(&run_options.config.config).map_err(|err| { anyhow!( "Failed to load config from file({path}): {:?}", @@ -234,51 +271,29 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { .into_iter() .collect(); + let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000); let store = Arc::new(Store { chains: chain_store?, liquidation_store: LiquidationStore::default(), per_operator: wallet, ws: ws::WsState { subscriber_counter: AtomicUsize::new(0), - subscribers: DashMap::new(), + subscribers: DashMap::new(), + update_tx, }, }); - let server_store = store.clone(); - - tokio::spawn(run_submission_loop(store.clone())); - tokio::spawn(run_verification_loop(store.clone())); - - let app: Router<()> = Router::new() - .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi())) - .route("/", get(root)) - .route("/v1/bids", post(bid::bid)) - .route( - "/v1/liquidation/opportunities", - post(liquidation::post_opportunity), - ) - .route( - "/v1/liquidation/opportunities", - get(liquidation::get_opportunities), - ) - .route( - "/v1/liquidation/opportunities/:opportunity_id/bids", - post(liquidation::post_bid), - ) - .route("/v1/ws", get(ws::ws_route_handler)) - .route("/live", get(live)) - .layer(CorsLayer::permissive()) - .with_state(server_store); - - axum::Server::bind(&run_options.server.listen_addr) - .serve(app.into_make_service()) - .with_graceful_shutdown(async { - while !SHOULD_EXIT.load(Ordering::Acquire) { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - tracing::info!("Shutting down RPC server..."); - }) - .await?; + let submission_loop = tokio::spawn(run_submission_loop(store.clone())); + let verification_loop = tokio::spawn(run_verification_loop(store.clone())); + let subscription_loop = tokio::spawn(run_subscription_loop(store.clone(), update_rx)); + let server_loop = tokio::spawn(start_api(run_options, store.clone())); + join_all(vec![ + submission_loop, + verification_loop, + subscription_loop, + server_loop, + ]) + .await; Ok(()) } diff --git a/auction-server/src/api/liquidation.rs b/auction-server/src/api/liquidation.rs index b3e0fe1e..3592e45f 100644 --- a/auction-server/src/api/liquidation.rs +++ b/auction-server/src/api/liquidation.rs @@ -5,10 +5,7 @@ use { handle_bid, BidResult, }, - ws::{ - notify_updates, - UpdateEvent::NewOpportunity, - }, + ws::UpdateEvent::NewOpportunity, ErrorBodyResponse, RestError, }, @@ -61,7 +58,6 @@ use { uuid::Uuid, }; - /// Similar to OpportunityParams, but with the opportunity id included. #[derive(Serialize, Deserialize, ToSchema, Clone, ToResponse)] pub struct OpportunityParamsWithMetadata { @@ -148,7 +144,15 @@ pub async fn post_opportunity( write_lock.insert(params.permission_key.clone(), vec![opportunity.clone()]); } - notify_updates(&store.ws, NewOpportunity(opportunity.clone().into())).await; + store + .ws + .update_tx + .send(NewOpportunity(opportunity.clone().into())) + .await + .map_err(|e| { + tracing::error!("Failed to send update: {}", e); + RestError::TemporarilyUnavailable + })?; tracing::debug!("number of permission keys: {}", write_lock.len()); tracing::debug!( diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index b1f57e77..7029a75c 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -2,13 +2,11 @@ use { crate::{ api::{ liquidation::OpportunityParamsWithMetadata, + EXIT_CHECK_INTERVAL, SHOULD_EXIT, }, config::ChainId, - state::{ - LiquidationOpportunity, - Store, - }, + state::Store, }, anyhow::{ anyhow, @@ -23,11 +21,9 @@ use { State, WebSocketUpgrade, }, - http::HeaderMap, response::IntoResponse, }, dashmap::DashMap, - ethers::types::Chain, futures::{ future::join_all, stream::{ @@ -58,6 +54,7 @@ use { pub struct WsState { pub subscriber_counter: AtomicUsize, pub subscribers: DashMap>, + pub update_tx: mpsc::Sender, } #[derive(Deserialize, Debug, Clone)] @@ -329,3 +326,35 @@ pub async fn notify_updates(ws_state: &WsState, event: UpdateEvent) { } }); } + + +pub async fn run_subscription_loop( + store: Arc, + mut receiver: mpsc::Receiver, +) -> Result<()> { + let mut interval = tokio::time::interval(EXIT_CHECK_INTERVAL); + + while !SHOULD_EXIT.load(Ordering::Acquire) { + tokio::select! { + update = receiver.recv() => { + match update { + None => { + // When the received message is None it means the channel has been closed. This + // should never happen as the channel is never closed. As we can't recover from + // this we shut down the application. + tracing::error!("Failed to receive update from store."); + SHOULD_EXIT.store(true, Ordering::Release); + break; + } + Some(event) => { + notify_updates(&store.ws, event).await; + }, + } + }, + _ = interval.tick() => {} + } + } + + tracing::info!("Shutting down Websocket notifier..."); + Ok(()) +} diff --git a/auction-server/src/auction.rs b/auction-server/src/auction.rs index a590ede4..c8bb0d35 100644 --- a/auction-server/src/auction.rs +++ b/auction-server/src/auction.rs @@ -1,10 +1,16 @@ use { crate::{ - api::SHOULD_EXIT, + api::{ + EXIT_CHECK_INTERVAL, + SHOULD_EXIT, + }, config::EthereumConfig, state::Store, }, - anyhow::anyhow, + anyhow::{ + anyhow, + Result, + }, ethers::{ contract::{ abigen, @@ -192,75 +198,84 @@ pub async fn submit_bids( res } -pub async fn run_submission_loop(store: Arc) { +pub async fn run_submission_loop(store: Arc) -> Result<()> { tracing::info!("Starting transaction submitter..."); + let mut exit_check_interval = tokio::time::interval(EXIT_CHECK_INTERVAL); + + // this should be replaced by a subscription to the chain and trigger on new blocks + let mut submission_interval = tokio::time::interval(Duration::from_secs(5)); while !SHOULD_EXIT.load(Ordering::Acquire) { - for (chain_id, chain_store) in &store.chains { - let permission_bids = chain_store.bids.read().await.clone(); - // release lock asap - tracing::info!( - "Chain: {chain_id} Auctions to process {auction_len}", - chain_id = chain_id, - auction_len = permission_bids.len() - ); - for (permission_key, bids) in permission_bids.iter() { - let mut cloned_bids = bids.clone(); - let thread_store = store.clone(); - let chain_id = chain_id.clone(); - let permission_key = permission_key.clone(); - { - cloned_bids.sort_by(|a, b| b.bid.cmp(&a.bid)); + tokio::select! { + _ = submission_interval.tick() => { + for (chain_id, chain_store) in &store.chains { + let permission_bids = chain_store.bids.read().await.clone(); + // release lock asap + tracing::info!( + "Chain: {chain_id} Auctions to process {auction_len}", + chain_id = chain_id, + auction_len = permission_bids.len() + ); + for (permission_key, bids) in permission_bids.iter() { + let mut cloned_bids = bids.clone(); + let thread_store = store.clone(); + let chain_id = chain_id.clone(); + let permission_key = permission_key.clone(); + { + cloned_bids.sort_by(|a, b| b.bid.cmp(&a.bid)); - // TODO: simulate all bids together and keep the successful ones - // let call = simulate_bids( - // store.per_operator.address(), - // chain_store.contract_addr, - // chain_store.provider.clone(), - // permission_key.clone(), - // cloned_bids.iter().map(|b| b.contract).collect(), - // cloned_bids.iter().map(|b| b.calldata.clone()).collect(), - // cloned_bids.iter().map(|b| b.bid.into()).collect(), - // ); + // TODO: simulate all bids together and keep the successful ones + // let call = simulate_bids( + // store.per_operator.address(), + // chain_store.contract_addr, + // chain_store.provider.clone(), + // permission_key.clone(), + // cloned_bids.iter().map(|b| b.contract).collect(), + // cloned_bids.iter().map(|b| b.calldata.clone()).collect(), + // cloned_bids.iter().map(|b| b.bid.into()).collect(), + // ); - // keep the highest bid for now - cloned_bids.truncate(1); + // keep the highest bid for now + cloned_bids.truncate(1); - match thread_store.chains.get(&chain_id) { - Some(chain_store) => { - let submission = submit_bids( - thread_store.per_operator.clone(), - chain_store.provider.clone(), - chain_store.config.clone(), - chain_store.network_id, - permission_key.clone(), - cloned_bids.iter().map(|b| b.contract).collect(), - cloned_bids.iter().map(|b| b.calldata.clone()).collect(), - cloned_bids.iter().map(|b| b.bid).collect(), - ) - .await; - match submission { - Ok(receipt) => match receipt { - Some(receipt) => { - tracing::debug!("Submitted transaction: {:?}", receipt); - chain_store.bids.write().await.remove(&permission_key); - } - None => { - tracing::error!("Failed to receive transaction receipt"); + match thread_store.chains.get(&chain_id) { + Some(chain_store) => { + let submission = submit_bids( + thread_store.per_operator.clone(), + chain_store.provider.clone(), + chain_store.config.clone(), + chain_store.network_id, + permission_key.clone(), + cloned_bids.iter().map(|b| b.contract).collect(), + cloned_bids.iter().map(|b| b.calldata.clone()).collect(), + cloned_bids.iter().map(|b| b.bid).collect(), + ) + .await; + match submission { + Ok(receipt) => match receipt { + Some(receipt) => { + tracing::debug!("Submitted transaction: {:?}", receipt); + chain_store.bids.write().await.remove(&permission_key); + } + None => { + tracing::error!("Failed to receive transaction receipt"); + } + }, + Err(err) => { + tracing::error!("Transaction failed to submit: {:?}", err); + } } - }, - Err(err) => { - tracing::error!("Transaction failed to submit: {:?}", err); + } + None => { + tracing::error!("Chain not found: {}", chain_id); } } } - None => { - tracing::error!("Chain not found: {}", chain_id); - } } } } + _ = exit_check_interval.tick() => {} } - tokio::time::sleep(Duration::from_secs(5)).await; // this should be replaced by a subscription to the chain and trigger on new blocks } tracing::info!("Shutting down transaction submitter..."); + Ok(()) } diff --git a/auction-server/src/liquidation_adapter.rs b/auction-server/src/liquidation_adapter.rs index 71a9d6c4..e40c6518 100644 --- a/auction-server/src/liquidation_adapter.rs +++ b/auction-server/src/liquidation_adapter.rs @@ -2,6 +2,7 @@ use { crate::{ api::{ liquidation::OpportunityBid, + EXIT_CHECK_INTERVAL, SHOULD_EXIT, }, auction::{ @@ -335,39 +336,48 @@ async fn verify_with_store(opportunity: LiquidationOpportunity, store: &Store) - /// * `store`: server store pub async fn run_verification_loop(store: Arc) -> Result<()> { tracing::info!("Starting opportunity verifier..."); + let mut exit_check_interval = tokio::time::interval(EXIT_CHECK_INTERVAL); + + // this should be replaced by a subscription to the chain and trigger on new blocks + let mut submission_interval = tokio::time::interval(Duration::from_secs(5)); while !SHOULD_EXIT.load(Ordering::Acquire) { - let all_opportunities = store.liquidation_store.opportunities.read().await.clone(); - for (permission_key, opportunities) in all_opportunities.iter() { - // check each of the opportunities for this permission key for validity - let mut opps_to_remove = vec![]; - for opportunity in opportunities.iter() { - match verify_with_store(opportunity.clone(), &store).await { - Ok(_) => {} - Err(e) => { - opps_to_remove.push(opportunity.id); - tracing::info!( - "Removing Opportunity {} with failed verification: {}", - opportunity.id, - e - ); + tokio::select! { + _ = submission_interval.tick() => { + let all_opportunities = store.liquidation_store.opportunities.read().await.clone(); + for (permission_key, opportunities) in all_opportunities.iter() { + // check each of the opportunities for this permission key for validity + let mut opps_to_remove = vec![]; + for opportunity in opportunities.iter() { + match verify_with_store(opportunity.clone(), &store).await { + Ok(_) => {} + Err(e) => { + opps_to_remove.push(opportunity.id); + tracing::info!( + "Removing Opportunity {} with failed verification: {}", + opportunity.id, + e + ); + } + } } - } - } - // set write lock to remove all these opportunities - let mut write_lock = store.liquidation_store.opportunities.write().await; + // set write lock to remove all these opportunities + let mut write_lock = store.liquidation_store.opportunities.write().await; - if let Some(opportunities) = write_lock.get_mut(permission_key) { - opportunities.retain(|x| !opps_to_remove.contains(&x.id)); - if opportunities.is_empty() { - write_lock.remove(permission_key); + if let Some(opportunities) = write_lock.get_mut(permission_key) { + opportunities.retain(|x| !opps_to_remove.contains(&x.id)); + if opportunities.is_empty() { + write_lock.remove(permission_key); + } + } + + // release the write lock + drop(write_lock); } } - - // release the write lock - drop(write_lock); + _ = exit_check_interval.tick() => { + } } - tokio::time::sleep(Duration::from_secs(5)).await; // this should be replaced by a subscription to the chain and trigger on new blocks } tracing::info!("Shutting down opportunity verifier..."); Ok(()) From 105f9a61c44b7f49b3ec9aec6b07e379f02617d6 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Thu, 15 Feb 2024 18:49:55 +0100 Subject: [PATCH 3/6] Change schema of requests and responses to support id --- auction-server/src/api/ws.rs | 126 ++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 55 deletions(-) diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index 7029a75c..334a4b14 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -58,7 +58,7 @@ pub struct WsState { } #[derive(Deserialize, Debug, Clone)] -#[serde(tag = "type")] +#[serde(tag = "method", content = "params")] enum ClientMessage { #[serde(rename = "subscribe")] Subscribe { chain_ids: Vec }, @@ -66,11 +66,17 @@ enum ClientMessage { Unsubscribe { chain_ids: Vec }, } +#[derive(Deserialize, Debug, Clone)] +struct ClientRequest { + id: String, + #[serde(flatten)] + msg: ClientMessage, +} + +/// This enum is used to send an update to the client for any subscriptions made #[derive(Serialize, Clone)] #[serde(tag = "type")] -enum ServerMessage { - #[serde(rename = "response")] - Response(ServerResponseMessage), +enum ServerUpdateResponse { #[serde(rename = "new_opportunity")] NewOpportunity { opportunity: OpportunityParamsWithMetadata, @@ -78,12 +84,21 @@ enum ServerMessage { } #[derive(Serialize, Debug, Clone)] -#[serde(tag = "status")] -enum ServerResponseMessage { +#[serde(tag = "status", content = "result")] +enum ServerResultMessage { #[serde(rename = "success")] Success, #[serde(rename = "error")] - Err { error: String }, + Err(String), +} + +/// This enum is used to send the result for a specific client request with the same id +/// id is only None when the client message is invalid +#[derive(Serialize, Debug, Clone)] +struct ServerResultResponse { + id: Option, + #[serde(flatten)] + result: ServerResultMessage, } pub async fn ws_route_handler( @@ -145,12 +160,11 @@ impl Subscriber { sender, chain_ids: HashSet::new(), ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), - exit_check_interval: tokio::time::interval(Duration::from_secs(5)), + exit_check_interval: tokio::time::interval(EXIT_CHECK_INTERVAL), responded_to_ping: true, // We start with true so we don't close the connection immediately } } - #[tracing::instrument(skip(self))] pub async fn run(&mut self) { while !self.closed { if let Err(e) = self.handle_next().await { @@ -200,7 +214,7 @@ impl Subscriber { return Ok(()); } let message = - serde_json::to_string(&ServerMessage::NewOpportunity { opportunity })?; + serde_json::to_string(&ServerUpdateResponse::NewOpportunity { opportunity })?; self.sender.send(message.into()).await?; } } @@ -208,7 +222,6 @@ impl Subscriber { Ok(()) } - #[tracing::instrument(skip(self, message))] async fn handle_client_message(&mut self, message: Message) -> Result<()> { let maybe_client_message = match message { Message::Close(_) => { @@ -216,8 +229,6 @@ impl Subscriber { // list, instead when the Subscriber struct is dropped the channel // to subscribers list will be closed and it will eventually get // removed. - tracing::trace!(id = self.id, "Subscriber Closed Connection."); - // Send the close message to gracefully shut down the connection // Otherwise the client might get an abnormal Websocket closure // error. @@ -225,8 +236,8 @@ impl Subscriber { self.closed = true; return Ok(()); } - Message::Text(text) => serde_json::from_str::(&text), - Message::Binary(data) => serde_json::from_slice::(&data), + Message::Text(text) => serde_json::from_str::(&text), + Message::Binary(data) => serde_json::from_slice::(&data), Message::Ping(_) => { // Axum will send Pong automatically return Ok(()); @@ -237,64 +248,69 @@ impl Subscriber { } }; - match maybe_client_message { + let request_id = match maybe_client_message { Err(e) => { self.sender .send( - serde_json::to_string(&ServerMessage::Response( - ServerResponseMessage::Err { - error: e.to_string(), - }, - ))? + serde_json::to_string(&ServerResultResponse { + id: None, + result: ServerResultMessage::Err(e.to_string()), + })? .into(), ) .await?; return Ok(()); } - Ok(ClientMessage::Subscribe { chain_ids }) => { - let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect(); + Ok(ClientRequest { msg, id }) => { + match msg { + ClientMessage::Subscribe { chain_ids } => { + let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect(); - let not_found_chain_ids: Vec<&ChainId> = chain_ids - .iter() - .filter(|chain_id| !available_chain_ids.contains(chain_id)) - .collect(); + let not_found_chain_ids: Vec<&ChainId> = chain_ids + .iter() + .filter(|chain_id| !available_chain_ids.contains(chain_id)) + .collect(); - // If there is a single chain id that is not found, we don't subscribe to any of the - // asked correct chain ids and return an error to be more explicit and clear. - if !not_found_chain_ids.is_empty() { - self.sender - .send( - serde_json::to_string(&ServerMessage::Response( - ServerResponseMessage::Err { - error: format!( - "Chain id(s) with id(s) {:?} not found", - not_found_chain_ids - ), - }, - ))? - .into(), - ) - .await?; - return Ok(()); - } else { - self.chain_ids.extend(chain_ids.into_iter()); + // If there is a single chain id that is not found, we don't subscribe to any of the + // asked correct chain ids and return an error to be more explicit and clear. + if !not_found_chain_ids.is_empty() { + self.sender + .send( + serde_json::to_string(&ServerResultResponse { + id: Some(id), + result: ServerResultMessage::Err(format!( + "Chain id(s) with id(s) {:?} not found", + not_found_chain_ids + )), + })? + .into(), + ) + .await?; + return Ok(()); + } else { + self.chain_ids.extend(chain_ids.into_iter()); + } + } + ClientMessage::Unsubscribe { chain_ids } => { + self.chain_ids + .retain(|chain_id| !chain_ids.contains(chain_id)); + } } + id } - Ok(ClientMessage::Unsubscribe { chain_ids }) => { - self.chain_ids - .retain(|chain_id| !chain_ids.contains(chain_id)); - } - } - - + }; self.sender .send( - serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Success))? - .into(), + serde_json::to_string(&ServerResultResponse { + id: Some(request_id), + result: ServerResultMessage::Success, + })? + .into(), ) .await?; + Ok(()) } } From c94169bdc1bd943291fc48a32fe94a04af29f997 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Mon, 19 Feb 2024 14:18:07 +0100 Subject: [PATCH 4/6] Simplify broadcast --- auction-server/src/api.rs | 19 ++---- auction-server/src/api/liquidation.rs | 3 +- auction-server/src/api/ws.rs | 83 ++++----------------------- 3 files changed, 16 insertions(+), 89 deletions(-) diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index ee3a1791..e88cd5c5 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -9,7 +9,6 @@ use { OpportunityBid, OpportunityParamsWithMetadata, }, - ws::run_subscription_loop, }, auction::run_submission_loop, config::{ @@ -45,7 +44,6 @@ use { Router, }, clap::crate_version, - dashmap::DashMap, ethers::{ providers::{ Http, @@ -89,7 +87,7 @@ use { // we don't rely on global state for anything else. pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1); - +const NOTIFICATIONS_CHAN_LEN: usize = 1000; async fn root() -> String { format!("PER Auction Server API {}", crate_version!()) } @@ -271,29 +269,22 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { .into_iter() .collect(); - let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000); + let (update_tx, update_rx) = tokio::sync::broadcast::channel(NOTIFICATIONS_CHAN_LEN); let store = Arc::new(Store { chains: chain_store?, liquidation_store: LiquidationStore::default(), per_operator: wallet, ws: ws::WsState { subscriber_counter: AtomicUsize::new(0), - subscribers: DashMap::new(), - update_tx, + broadcast_sender: update_tx, + broadcast_receiver: update_rx, }, }); let submission_loop = tokio::spawn(run_submission_loop(store.clone())); let verification_loop = tokio::spawn(run_verification_loop(store.clone())); - let subscription_loop = tokio::spawn(run_subscription_loop(store.clone(), update_rx)); let server_loop = tokio::spawn(start_api(run_options, store.clone())); - join_all(vec![ - submission_loop, - verification_loop, - subscription_loop, - server_loop, - ]) - .await; + join_all(vec![submission_loop, verification_loop, server_loop]).await; Ok(()) } diff --git a/auction-server/src/api/liquidation.rs b/auction-server/src/api/liquidation.rs index 3592e45f..ae80c42a 100644 --- a/auction-server/src/api/liquidation.rs +++ b/auction-server/src/api/liquidation.rs @@ -146,9 +146,8 @@ pub async fn post_opportunity( store .ws - .update_tx + .broadcast_sender .send(NewOpportunity(opportunity.clone().into())) - .await .map_err(|e| { tracing::error!("Failed to send update: {}", e); RestError::TemporarilyUnavailable diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index 334a4b14..a707d4db 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -23,9 +23,7 @@ use { }, response::IntoResponse, }, - dashmap::DashMap, futures::{ - future::join_all, stream::{ SplitSink, SplitStream, @@ -48,13 +46,13 @@ use { }, time::Duration, }, - tokio::sync::mpsc, + tokio::sync::broadcast, }; pub struct WsState { pub subscriber_counter: AtomicUsize, - pub subscribers: DashMap>, - pub update_tx: mpsc::Sender, + pub broadcast_sender: broadcast::Sender, + pub broadcast_receiver: broadcast::Receiver, } #[derive(Deserialize, Debug, Clone)] @@ -111,10 +109,9 @@ pub async fn ws_route_handler( async fn websocket_handler(stream: WebSocket, state: Arc) { let ws_state = &state.ws; let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); - let (notify_sender, notify_receiver) = mpsc::channel(NOTIFICATIONS_CHAN_LEN); let (sender, receiver) = stream.split(); - ws_state.subscribers.insert(id, notify_sender); - let mut subscriber = Subscriber::new(id, state, notify_receiver, receiver, sender); + let new_receiver = ws_state.broadcast_receiver.resubscribe(); + let mut subscriber = Subscriber::new(id, state, new_receiver, receiver, sender); subscriber.run().await; } @@ -131,7 +128,7 @@ pub struct Subscriber { id: SubscriberId, closed: bool, store: Arc, - notify_receiver: mpsc::Receiver, + notify_receiver: broadcast::Receiver, receiver: SplitStream, sender: SplitSink, chain_ids: HashSet, @@ -141,13 +138,12 @@ pub struct Subscriber { } const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); -const NOTIFICATIONS_CHAN_LEN: usize = 1000; impl Subscriber { pub fn new( id: SubscriberId, store: Arc, - notify_receiver: mpsc::Receiver, + notify_receiver: broadcast::Receiver, receiver: SplitStream, sender: SplitSink, ) -> Self { @@ -178,8 +174,9 @@ impl Subscriber { tokio::select! { maybe_update_event = self.notify_receiver.recv() => { match maybe_update_event { - Some(event) => self.handle_update(event).await, - None => Err(anyhow!("Update channel closed. This should never happen. Closing connection.")) + + Ok(event) => self.handle_update(event).await, + Err(e) => Err(anyhow!("Error receiving update event: {:?}", e)), } }, maybe_message_or_err = self.receiver.next() => { @@ -314,63 +311,3 @@ impl Subscriber { Ok(()) } } - - -pub async fn notify_updates(ws_state: &WsState, event: UpdateEvent) { - let closed_subscribers: Vec> = - join_all(ws_state.subscribers.iter_mut().map(|subscriber| { - let event = event.clone(); - async move { - match subscriber.send(event).await { - Ok(_) => None, - Err(_) => { - // An error here indicates the channel is closed (which may happen either when the - // client has sent Message::Close or some other abrupt disconnection). We remove - // subscribers only when send fails so we can handle closure only once when we are - // able to see send() fail. - Some(*subscriber.key()) - } - } - } - })) - .await; - - // Remove closed_subscribers from ws_state - closed_subscribers.into_iter().for_each(|id| { - if let Some(id) = id { - ws_state.subscribers.remove(&id); - } - }); -} - - -pub async fn run_subscription_loop( - store: Arc, - mut receiver: mpsc::Receiver, -) -> Result<()> { - let mut interval = tokio::time::interval(EXIT_CHECK_INTERVAL); - - while !SHOULD_EXIT.load(Ordering::Acquire) { - tokio::select! { - update = receiver.recv() => { - match update { - None => { - // When the received message is None it means the channel has been closed. This - // should never happen as the channel is never closed. As we can't recover from - // this we shut down the application. - tracing::error!("Failed to receive update from store."); - SHOULD_EXIT.store(true, Ordering::Release); - break; - } - Some(event) => { - notify_updates(&store.ws, event).await; - }, - } - }, - _ = interval.tick() => {} - } - } - - tracing::info!("Shutting down Websocket notifier..."); - Ok(()) -} From 89f1747a60441c4ee115ebe7195e3eda48970f93 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Mon, 19 Feb 2024 14:37:57 +0100 Subject: [PATCH 5/6] Address PR comments --- auction-server/Cargo.lock | 2 +- auction-server/Cargo.toml | 2 +- auction-server/src/api.rs | 123 ++------------------- auction-server/src/api/liquidation.rs | 4 +- auction-server/src/api/ws.rs | 6 +- auction-server/src/auction.rs | 4 +- auction-server/src/liquidation_adapter.rs | 10 +- auction-server/src/main.rs | 3 +- auction-server/src/server.rs | 125 ++++++++++++++++++++++ 9 files changed, 150 insertions(+), 129 deletions(-) create mode 100644 auction-server/src/server.rs diff --git a/auction-server/Cargo.lock b/auction-server/Cargo.lock index 103dc091..5f8d6e99 100644 --- a/auction-server/Cargo.lock +++ b/auction-server/Cargo.lock @@ -162,7 +162,7 @@ dependencies = [ [[package]] name = "auction-server" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-stream", diff --git a/auction-server/Cargo.toml b/auction-server/Cargo.toml index 3aa89106..bdc9c11d 100644 --- a/auction-server/Cargo.toml +++ b/auction-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "auction-server" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index e88cd5c5..3e6c0a17 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -10,26 +10,19 @@ use { OpportunityParamsWithMetadata, }, }, - auction::run_submission_loop, - config::{ - ChainId, - Config, - RunOptions, + config::RunOptions, + server::{ + EXIT_CHECK_INTERVAL, + SHOULD_EXIT, }, - liquidation_adapter::run_verification_loop, state::{ - ChainStore, - LiquidationStore, OpportunityParams, OpportunityParamsV1, Store, TokenQty, }, }, - anyhow::{ - anyhow, - Result, - }, + anyhow::Result, axum::{ http::StatusCode, response::{ @@ -44,31 +37,11 @@ use { Router, }, clap::crate_version, - ethers::{ - providers::{ - Http, - Middleware, - Provider, - }, - signers::{ - LocalWallet, - Signer, - }, - types::Bytes, - }, - futures::future::join_all, + ethers::types::Bytes, serde::Serialize, - std::{ - collections::HashMap, - sync::{ - atomic::{ - AtomicBool, - AtomicUsize, - Ordering, - }, - Arc, - }, - time::Duration, + std::sync::{ + atomic::Ordering, + Arc, }, tower_http::cors::CorsLayer, utoipa::{ @@ -79,15 +52,6 @@ use { utoipa_swagger_ui::SwaggerUi, }; -// A static exit flag to indicate to running threads that we're shutting down. This is used to -// gracefully shutdown the application. -// -// NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a -// shutdown signal to all running tasks. However, this is a bit more complicated to implement and -// we don't rely on global state for anything else. -pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); -pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1); -const NOTIFICATIONS_CHAN_LEN: usize = 1000; async fn root() -> String { format!("PER Auction Server API {}", crate_version!()) } @@ -219,72 +183,3 @@ pub async fn start_api(run_options: RunOptions, store: Arc) -> Result<()> .await?; Ok(()) } - -pub async fn start_server(run_options: RunOptions) -> Result<()> { - tokio::spawn(async move { - tracing::info!("Registered shutdown signal handler..."); - tokio::signal::ctrl_c().await.unwrap(); - tracing::info!("Shut down signal received, waiting for tasks..."); - SHOULD_EXIT.store(true, Ordering::Release); - }); - - - let config = Config::load(&run_options.config.config).map_err(|err| { - anyhow!( - "Failed to load config from file({path}): {:?}", - err, - path = run_options.config.config - ) - })?; - - let wallet = run_options.per_private_key.parse::()?; - tracing::info!("Using wallet address: {}", wallet.address().to_string()); - - let chain_store: Result> = join_all(config.chains.iter().map( - |(chain_id, chain_config)| async move { - let mut provider = Provider::::try_from(chain_config.geth_rpc_addr.clone()) - .map_err(|err| { - anyhow!( - "Failed to connect to chain({chain_id}) at {rpc_addr}: {:?}", - err, - chain_id = chain_id, - rpc_addr = chain_config.geth_rpc_addr - ) - })?; - provider.set_interval(Duration::from_secs(chain_config.poll_interval)); - let id = provider.get_chainid().await?.as_u64(); - Ok(( - chain_id.clone(), - ChainStore { - provider, - network_id: id, - bids: Default::default(), - token_spoof_info: Default::default(), - config: chain_config.clone(), - }, - )) - }, - )) - .await - .into_iter() - .collect(); - - let (update_tx, update_rx) = tokio::sync::broadcast::channel(NOTIFICATIONS_CHAN_LEN); - let store = Arc::new(Store { - chains: chain_store?, - liquidation_store: LiquidationStore::default(), - per_operator: wallet, - ws: ws::WsState { - subscriber_counter: AtomicUsize::new(0), - broadcast_sender: update_tx, - broadcast_receiver: update_rx, - }, - }); - - - let submission_loop = tokio::spawn(run_submission_loop(store.clone())); - let verification_loop = tokio::spawn(run_verification_loop(store.clone())); - let server_loop = tokio::spawn(start_api(run_options, store.clone())); - join_all(vec![submission_loop, verification_loop, server_loop]).await; - Ok(()) -} diff --git a/auction-server/src/api/liquidation.rs b/auction-server/src/api/liquidation.rs index ae80c42a..09ac23ed 100644 --- a/auction-server/src/api/liquidation.rs +++ b/auction-server/src/api/liquidation.rs @@ -131,8 +131,8 @@ pub async fn post_opportunity( if let Some(opportunities_existing) = write_lock.get_mut(¶ms.permission_key) { // check if same opportunity exists in the vector - for opportunity_existing in opportunities_existing.clone() { - if opportunity_existing == opportunity { + for opportunity_existing in opportunities_existing.iter() { + if opportunity_existing == &opportunity { return Err(RestError::BadParameters( "Duplicate opportunity submission".to_string(), )); diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index a707d4db..9e292c35 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -1,11 +1,11 @@ use { crate::{ - api::{ - liquidation::OpportunityParamsWithMetadata, + api::liquidation::OpportunityParamsWithMetadata, + config::ChainId, + server::{ EXIT_CHECK_INTERVAL, SHOULD_EXIT, }, - config::ChainId, state::Store, }, anyhow::{ diff --git a/auction-server/src/auction.rs b/auction-server/src/auction.rs index c8bb0d35..ca642141 100644 --- a/auction-server/src/auction.rs +++ b/auction-server/src/auction.rs @@ -1,10 +1,10 @@ use { crate::{ - api::{ + config::EthereumConfig, + server::{ EXIT_CHECK_INTERVAL, SHOULD_EXIT, }, - config::EthereumConfig, state::Store, }, anyhow::{ diff --git a/auction-server/src/liquidation_adapter.rs b/auction-server/src/liquidation_adapter.rs index e40c6518..89812e68 100644 --- a/auction-server/src/liquidation_adapter.rs +++ b/auction-server/src/liquidation_adapter.rs @@ -1,15 +1,15 @@ use { crate::{ - api::{ - liquidation::OpportunityBid, - EXIT_CHECK_INTERVAL, - SHOULD_EXIT, - }, + api::liquidation::OpportunityBid, auction::{ evaluate_simulation_results, get_simulation_call, MulticallReturn, }, + server::{ + EXIT_CHECK_INTERVAL, + SHOULD_EXIT, + }, state::{ ChainStore, LiquidationOpportunity, diff --git a/auction-server/src/main.rs b/auction-server/src/main.rs index 696c4291..8db99783 100644 --- a/auction-server/src/main.rs +++ b/auction-server/src/main.rs @@ -1,7 +1,7 @@ use { - crate::api::start_server, anyhow::Result, clap::Parser, + server::start_server, std::io::IsTerminal, tracing_subscriber::filter::LevelFilter, }; @@ -11,6 +11,7 @@ mod auction; mod config; mod liquidation_adapter; mod serde; +mod server; mod state; mod token_spoof; diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs new file mode 100644 index 00000000..bd52f0aa --- /dev/null +++ b/auction-server/src/server.rs @@ -0,0 +1,125 @@ +use { + crate::{ + api, + api::ws, + auction::run_submission_loop, + config::{ + ChainId, + Config, + RunOptions, + }, + liquidation_adapter::run_verification_loop, + state::{ + ChainStore, + LiquidationStore, + Store, + }, + }, + anyhow::anyhow, + ethers::{ + prelude::{ + LocalWallet, + Provider, + }, + providers::{ + Http, + Middleware, + }, + signers::Signer, + }, + futures::future::join_all, + std::{ + collections::HashMap, + sync::{ + atomic::{ + AtomicBool, + AtomicUsize, + Ordering, + }, + Arc, + }, + time::Duration, + }, +}; + +const NOTIFICATIONS_CHAN_LEN: usize = 1000; +pub async fn start_server(run_options: RunOptions) -> anyhow::Result<()> { + tokio::spawn(async move { + tracing::info!("Registered shutdown signal handler..."); + tokio::signal::ctrl_c().await.unwrap(); + tracing::info!("Shut down signal received, waiting for tasks..."); + SHOULD_EXIT.store(true, Ordering::Release); + }); + + + let config = Config::load(&run_options.config.config).map_err(|err| { + anyhow!( + "Failed to load config from file({path}): {:?}", + err, + path = run_options.config.config + ) + })?; + + let wallet = run_options.per_private_key.parse::()?; + tracing::info!("Using wallet address: {}", wallet.address().to_string()); + + let chain_store: anyhow::Result> = join_all( + config + .chains + .iter() + .map(|(chain_id, chain_config)| async move { + let mut provider = Provider::::try_from(chain_config.geth_rpc_addr.clone()) + .map_err(|err| { + anyhow!( + "Failed to connect to chain({chain_id}) at {rpc_addr}: {:?}", + err, + chain_id = chain_id, + rpc_addr = chain_config.geth_rpc_addr + ) + })?; + provider.set_interval(Duration::from_secs(chain_config.poll_interval)); + let id = provider.get_chainid().await?.as_u64(); + Ok(( + chain_id.clone(), + ChainStore { + provider, + network_id: id, + bids: Default::default(), + token_spoof_info: Default::default(), + config: chain_config.clone(), + }, + )) + }), + ) + .await + .into_iter() + .collect(); + + let (update_tx, update_rx) = tokio::sync::broadcast::channel(NOTIFICATIONS_CHAN_LEN); + let store = Arc::new(Store { + chains: chain_store?, + liquidation_store: LiquidationStore::default(), + per_operator: wallet, + ws: ws::WsState { + subscriber_counter: AtomicUsize::new(0), + broadcast_sender: update_tx, + broadcast_receiver: update_rx, + }, + }); + + + let submission_loop = tokio::spawn(run_submission_loop(store.clone())); + let verification_loop = tokio::spawn(run_verification_loop(store.clone())); + let server_loop = tokio::spawn(api::start_api(run_options, store.clone())); + join_all(vec![submission_loop, verification_loop, server_loop]).await; + Ok(()) +} + +// A static exit flag to indicate to running threads that we're shutting down. This is used to +// gracefully shutdown the application. +// +// NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a +// shutdown signal to all running tasks. However, this is a bit more complicated to implement and +// we don't rely on global state for anything else. +pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); +pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1); From 20fbd4bd98268ead6ec00bdf5c588ceb631850fb Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Mon, 19 Feb 2024 14:45:03 +0100 Subject: [PATCH 6/6] Expose websocket messages schemas --- auction-server/src/api.rs | 12 ++++++++++++ auction-server/src/api/ws.rs | 21 +++++++++++---------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index 3e6c0a17..0f8f1be7 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -9,6 +9,13 @@ use { OpportunityBid, OpportunityParamsWithMetadata, }, + ws::{ + ClientMessage, + ClientRequest, + ServerResultMessage, + ServerResultResponse, + ServerUpdateResponse, + }, }, config::RunOptions, server::{ @@ -141,6 +148,11 @@ pub async fn start_api(run_options: RunOptions, store: Arc) -> Result<()> schemas(TokenQty), schemas(BidResult), schemas(ErrorBodyResponse), + schemas(ClientRequest), + schemas(ClientMessage), + schemas(ServerResultMessage), + schemas(ServerUpdateResponse), + schemas(ServerResultResponse), responses(ErrorBodyResponse), responses(OpportunityParamsWithMetadata), responses(BidResult) diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index 9e292c35..02e8d8df 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -47,6 +47,7 @@ use { time::Duration, }, tokio::sync::broadcast, + utoipa::ToSchema, }; pub struct WsState { @@ -55,35 +56,35 @@ pub struct WsState { pub broadcast_receiver: broadcast::Receiver, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, ToSchema)] #[serde(tag = "method", content = "params")] -enum ClientMessage { +pub enum ClientMessage { #[serde(rename = "subscribe")] Subscribe { chain_ids: Vec }, #[serde(rename = "unsubscribe")] Unsubscribe { chain_ids: Vec }, } -#[derive(Deserialize, Debug, Clone)] -struct ClientRequest { +#[derive(Deserialize, Debug, Clone, ToSchema)] +pub struct ClientRequest { id: String, #[serde(flatten)] msg: ClientMessage, } /// This enum is used to send an update to the client for any subscriptions made -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, ToSchema)] #[serde(tag = "type")] -enum ServerUpdateResponse { +pub enum ServerUpdateResponse { #[serde(rename = "new_opportunity")] NewOpportunity { opportunity: OpportunityParamsWithMetadata, }, } -#[derive(Serialize, Debug, Clone)] +#[derive(Serialize, Debug, Clone, ToSchema)] #[serde(tag = "status", content = "result")] -enum ServerResultMessage { +pub enum ServerResultMessage { #[serde(rename = "success")] Success, #[serde(rename = "error")] @@ -92,8 +93,8 @@ enum ServerResultMessage { /// This enum is used to send the result for a specific client request with the same id /// id is only None when the client message is invalid -#[derive(Serialize, Debug, Clone)] -struct ServerResultResponse { +#[derive(Serialize, Debug, Clone, ToSchema)] +pub struct ServerResultResponse { id: Option, #[serde(flatten)] result: ServerResultMessage,