diff --git a/auction-server/src/api/bid.rs b/auction-server/src/api/bid.rs index 6c813595..460c5450 100644 --- a/auction-server/src/api/bid.rs +++ b/auction-server/src/api/bid.rs @@ -80,9 +80,8 @@ pub async fn bid_status( State(store): State>, Path(bid_id): Path, ) -> Result, RestError> { - let status = store.bid_status_store.get_status(&bid_id).await; - match status { - Some(status) => Ok(status.into()), + match store.bids.read().await.get(&bid_id) { + Some(bid) => Ok(bid.status.clone().into()), None => Err(RestError::BidNotFound), } } diff --git a/auction-server/src/auction.rs b/auction-server/src/auction.rs index 7b9e81ba..c0e6bfb5 100644 --- a/auction-server/src/auction.rs +++ b/auction-server/src/auction.rs @@ -60,6 +60,7 @@ use { Serialize, }, std::{ + collections::HashMap, result, sync::{ atomic::Ordering, @@ -131,6 +132,7 @@ pub fn evaluate_simulation_results(results: Vec) -> Result<(), } Ok(()) } + pub async fn simulate_bids( relayer: Address, provider: Provider, @@ -235,14 +237,18 @@ pub async fn run_submission_loop(store: Arc) -> Result<()> { tokio::select! { _ = submission_interval.tick() => { for (chain_id, chain_store) in &store.chains { - let permission_bids = chain_store.bids.read().await.clone(); - // release lock asap + let all_bids = store.get_bids_by_chain_id(chain_id).await; + let bid_by_permission_key = all_bids.into_iter().fold(HashMap::new(), |mut acc, bid| { + acc.entry(bid.permission_key.clone()).or_insert_with(Vec::new).push(bid); + acc + }); + tracing::info!( "Chain: {chain_id} Auctions to process {auction_len}", chain_id = chain_id, - auction_len = permission_bids.len() + auction_len = bid_by_permission_key.len() ); - for (permission_key, bids) in permission_bids.iter() { + for (permission_key, bids) in bid_by_permission_key.iter() { let mut cloned_bids = bids.clone(); let permission_key = permission_key.clone(); cloned_bids.sort_by(|a, b| b.bid_amount.cmp(&a.bid_amount)); @@ -271,9 +277,9 @@ pub async fn run_submission_loop(store: Arc) -> Result<()> { true => BidStatus::Submitted(receipt.transaction_hash), false => BidStatus::Lost }; - store.bid_status_store.set_and_broadcast(BidStatusWithId { id: bid.id, bid_status }).await; + store.set_bid_status_and_broadcast(BidStatusWithId { id: bid.id, bid_status }).await?; + store.remove_bid(&bid.id).await?; } - chain_store.bids.write().await.remove(&permission_key); } None => { tracing::error!("Failed to receive transaction receipt"); @@ -297,19 +303,19 @@ pub async fn run_submission_loop(store: Arc) -> Result<()> { #[derive(Serialize, Deserialize, ToSchema, Clone, Debug)] pub struct Bid { /// The permission key to bid on. - #[schema(example = "0xdeadbeef", value_type=String)] + #[schema(example = "0xdeadbeef", value_type = String)] pub permission_key: Bytes, /// The chain id to bid on. - #[schema(example = "sepolia", value_type=String)] + #[schema(example = "sepolia", value_type = String)] pub chain_id: ChainId, /// The contract address to call. - #[schema(example = "0xcA11bde05977b3631167028862bE2a173976CA11",value_type = String)] + #[schema(example = "0xcA11bde05977b3631167028862bE2a173976CA11", value_type = String)] pub target_contract: abi::Address, /// Calldata for the contract call. - #[schema(example = "0xdeadbeef", value_type=String)] + #[schema(example = "0xdeadbeef", value_type = String)] pub target_calldata: Bytes, /// Amount of bid in wei. - #[schema(example = "10", value_type=String)] + #[schema(example = "10", value_type = String)] #[serde(with = "crate::serde::u256")] pub amount: BidAmount, } @@ -353,6 +359,9 @@ pub async fn handle_bid(store: Arc, bid: Bid) -> result::Result anyhow::Result<()> { ChainStore { provider, network_id: id, - bids: Default::default(), token_spoof_info: Default::default(), config: chain_config.clone(), }, @@ -108,12 +106,10 @@ pub async fn start_server(run_options: RunOptions) -> anyhow::Result<()> { .expect("Failed to connect to database"); let store = Arc::new(Store { db: pool, + bids: Default::default(), chains: chain_store?, opportunity_store: OpportunityStore::default(), - bid_status_store: BidStatusStore { - bids_status: Default::default(), - event_sender: broadcast_sender.clone(), - }, + event_sender: broadcast_sender.clone(), relayer: wallet, ws: ws::WsState { subscriber_counter: AtomicUsize::new(0), diff --git a/auction-server/src/state.rs b/auction-server/src/state.rs index 1db3b6ed..b861f8c8 100644 --- a/auction-server/src/state.rs +++ b/auction-server/src/state.rs @@ -60,6 +60,9 @@ pub struct SimulatedBid { pub target_contract: Address, pub target_calldata: Bytes, pub bid_amount: BidAmount, + pub permission_key: PermissionKey, + pub chain_id: ChainId, + pub status: BidStatus, // simulation_time: } @@ -133,7 +136,6 @@ pub struct ChainStore { pub network_id: u64, pub config: EthereumConfig, pub token_spoof_info: RwLock>, - pub bids: RwLock>>, } #[derive(Default)] @@ -169,6 +171,16 @@ pub enum BidStatus { Lost, } +impl BidStatus { + pub fn status_name(&self) -> String { + match self { + BidStatus::Pending => "pending".to_string(), + BidStatus::Submitted(_) => "submitted".to_string(), + BidStatus::Lost => "lost".to_string(), + } + } +} + #[derive(Serialize, Clone, ToSchema, ToResponse)] pub struct BidStatusWithId { #[schema(value_type = String)] @@ -176,31 +188,10 @@ pub struct BidStatusWithId { pub bid_status: BidStatus, } -pub struct BidStatusStore { - pub bids_status: RwLock>, - pub event_sender: broadcast::Sender, -} - -impl BidStatusStore { - pub async fn get_status(&self, id: &BidId) -> Option { - self.bids_status.read().await.get(id).cloned() - } - - pub async fn set_and_broadcast(&self, update: BidStatusWithId) { - self.bids_status - .write() - .await - .insert(update.id, update.bid_status.clone()); - match self.event_sender.send(UpdateEvent::BidStatusUpdate(update)) { - Ok(_) => (), - Err(e) => tracing::error!("Failed to send bid status update: {}", e), - }; - } -} - pub struct Store { pub chains: HashMap, - pub bid_status_store: BidStatusStore, + pub bids: RwLock>, + pub event_sender: broadcast::Sender, pub opportunity_store: OpportunityStore, pub relayer: LocalWallet, pub ws: WsState, @@ -280,35 +271,77 @@ impl Store { ) -> Result<(), RestError> { let bid_id = bid.id; let now = OffsetDateTime::now_utc(); - sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, target_contract, target_calldata, bid_amount, status) VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending')", + sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, target_contract, target_calldata, bid_amount, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", bid.id, PrimitiveDateTime::new(now.date(), now.time()), permission_key.to_vec(), chain_id, &bid.target_contract.to_fixed_bytes(), bid.target_calldata.to_vec(), - BigDecimal::from_str(&bid.bid_amount.to_string()).unwrap()) + BigDecimal::from_str(&bid.bid_amount.to_string()).unwrap(), + bid.status.status_name(), + ) .execute(&self.db) .await.map_err(|e| { tracing::error!("DB: Failed to insert bid: {}", e); RestError::TemporarilyUnavailable })?; - self.chains - .get(chain_id) - .expect("chain exists") - .bids - .write() + self.bids.write().await.insert(bid_id, bid.clone()); + self.broadcast_status_update(BidStatusWithId { + id: bid_id, + bid_status: bid.status.clone(), + }); + Ok(()) + } + + pub async fn set_bid_status_and_broadcast( + &self, + update: BidStatusWithId, + ) -> anyhow::Result<()> { + sqlx::query!( + "UPDATE bid SET status = $1 WHERE id = $2", + update.bid_status.status_name(), + update.id + ) + .execute(&self.db) + .await?; + + + self.bids.write().await.get_mut(&update.id).map(|bid| { + bid.status = update.bid_status.clone(); + }); + self.broadcast_status_update(update); + Ok(()) + } + + fn broadcast_status_update(&self, update: BidStatusWithId) { + match self.event_sender.send(UpdateEvent::BidStatusUpdate(update)) { + Ok(_) => (), + Err(e) => tracing::error!("Failed to send bid status update: {}", e), + }; + } + + pub async fn get_bids_by_chain_id(&self, chain_id: &ChainId) -> Vec { + self.bids + .read() .await - .entry(permission_key) - .or_default() - .push(bid); - self.bid_status_store - .set_and_broadcast(BidStatusWithId { - id: bid_id, - bid_status: BidStatus::Pending, - }) - .await; + .values() + .filter(|bid| bid.chain_id.eq(chain_id)) + .cloned() + .collect() + } + + pub async fn remove_bid(&self, bid_id: &BidId) -> anyhow::Result<()> { + let now = OffsetDateTime::now_utc(); + sqlx::query!( + "UPDATE bid SET removal_time = $1 WHERE id = $2 AND removal_time IS NULL", + PrimitiveDateTime::new(now.date(), now.time()), + bid_id + ) + .execute(&self.db) + .await?; + self.bids.write().await.remove(bid_id); Ok(()) } }