Skip to content

Commit

Permalink
Refactor bidstore
Browse files Browse the repository at this point in the history
  • Loading branch information
m30m committed Mar 26, 2024
1 parent 60e1cad commit 540397e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 60 deletions.
5 changes: 2 additions & 3 deletions auction-server/src/api/bid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ pub async fn bid_status(
State(store): State<Arc<Store>>,
Path(bid_id): Path<BidId>,
) -> Result<Json<BidStatus>, RestError> {
let status = store.bid_status_store.get_status(&bid_id).await;
match status {
Some(status) => Ok(status.into()),
match store.bids.read().await.get(&bid_id) {
Some(bid) => Ok(bid.status.clone().into()),
None => Err(RestError::BidNotFound),
}
}
31 changes: 20 additions & 11 deletions auction-server/src/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use {
Serialize,
},
std::{
collections::HashMap,
result,
sync::{
atomic::Ordering,
Expand Down Expand Up @@ -131,6 +132,7 @@ pub fn evaluate_simulation_results(results: Vec<MulticallStatus>) -> Result<(),
}
Ok(())
}

pub async fn simulate_bids(
relayer: Address,
provider: Provider<Http>,
Expand Down Expand Up @@ -235,14 +237,18 @@ pub async fn run_submission_loop(store: Arc<Store>) -> Result<()> {
tokio::select! {
_ = submission_interval.tick() => {
for (chain_id, chain_store) in &store.chains {
let permission_bids = chain_store.bids.read().await.clone();
// release lock asap
let all_bids = store.get_bids_by_chain_id(chain_id).await;
let bid_by_permission_key = all_bids.into_iter().fold(HashMap::new(), |mut acc, bid| {
acc.entry(bid.permission_key.clone()).or_insert_with(Vec::new).push(bid);
acc
});

tracing::info!(
"Chain: {chain_id} Auctions to process {auction_len}",
chain_id = chain_id,
auction_len = permission_bids.len()
auction_len = bid_by_permission_key.len()
);
for (permission_key, bids) in permission_bids.iter() {
for (permission_key, bids) in bid_by_permission_key.iter() {
let mut cloned_bids = bids.clone();
let permission_key = permission_key.clone();
cloned_bids.sort_by(|a, b| b.bid_amount.cmp(&a.bid_amount));
Expand Down Expand Up @@ -271,9 +277,9 @@ pub async fn run_submission_loop(store: Arc<Store>) -> Result<()> {
true => BidStatus::Submitted(receipt.transaction_hash),
false => BidStatus::Lost
};
store.bid_status_store.set_and_broadcast(BidStatusWithId { id: bid.id, bid_status }).await;
store.set_bid_status_and_broadcast(BidStatusWithId { id: bid.id, bid_status }).await?;
store.remove_bid(&bid.id).await?;
}
chain_store.bids.write().await.remove(&permission_key);
}
None => {
tracing::error!("Failed to receive transaction receipt");
Expand All @@ -297,19 +303,19 @@ pub async fn run_submission_loop(store: Arc<Store>) -> Result<()> {
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
pub struct Bid {
/// The permission key to bid on.
#[schema(example = "0xdeadbeef", value_type=String)]
#[schema(example = "0xdeadbeef", value_type = String)]
pub permission_key: Bytes,
/// The chain id to bid on.
#[schema(example = "sepolia", value_type=String)]
#[schema(example = "sepolia", value_type = String)]
pub chain_id: ChainId,
/// The contract address to call.
#[schema(example = "0xcA11bde05977b3631167028862bE2a173976CA11",value_type = String)]
#[schema(example = "0xcA11bde05977b3631167028862bE2a173976CA11", value_type = String)]
pub target_contract: abi::Address,
/// Calldata for the contract call.
#[schema(example = "0xdeadbeef", value_type=String)]
#[schema(example = "0xdeadbeef", value_type = String)]
pub target_calldata: Bytes,
/// Amount of bid in wei.
#[schema(example = "10", value_type=String)]
#[schema(example = "10", value_type = String)]
#[serde(with = "crate::serde::u256")]
pub amount: BidAmount,
}
Expand Down Expand Up @@ -353,6 +359,9 @@ pub async fn handle_bid(store: Arc<Store>, bid: Bid) -> result::Result<Uuid, Res
target_calldata: bid.target_calldata.clone(),
bid_amount: bid.amount,
id: bid_id,
permission_key: bid.permission_key.clone(),
chain_id: bid.chain_id.clone(),
status: BidStatus::Pending,
};
store
.add_bid(&bid.chain_id, bid.permission_key.clone(), simulated_bid)
Expand Down
8 changes: 2 additions & 6 deletions auction-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use {
},
opportunity_adapter::run_verification_loop,
state::{
BidStatusStore,
ChainStore,
OpportunityStore,
Store,
Expand Down Expand Up @@ -86,7 +85,6 @@ pub async fn start_server(run_options: RunOptions) -> anyhow::Result<()> {
ChainStore {
provider,
network_id: id,
bids: Default::default(),
token_spoof_info: Default::default(),
config: chain_config.clone(),
},
Expand All @@ -108,12 +106,10 @@ pub async fn start_server(run_options: RunOptions) -> anyhow::Result<()> {
.expect("Failed to connect to database");
let store = Arc::new(Store {
db: pool,
bids: Default::default(),
chains: chain_store?,
opportunity_store: OpportunityStore::default(),
bid_status_store: BidStatusStore {
bids_status: Default::default(),
event_sender: broadcast_sender.clone(),
},
event_sender: broadcast_sender.clone(),
relayer: wallet,
ws: ws::WsState {
subscriber_counter: AtomicUsize::new(0),
Expand Down
113 changes: 73 additions & 40 deletions auction-server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct SimulatedBid {
pub target_contract: Address,
pub target_calldata: Bytes,
pub bid_amount: BidAmount,
pub permission_key: PermissionKey,
pub chain_id: ChainId,
pub status: BidStatus,
// simulation_time:
}

Expand Down Expand Up @@ -133,7 +136,6 @@ pub struct ChainStore {
pub network_id: u64,
pub config: EthereumConfig,
pub token_spoof_info: RwLock<HashMap<Address, SpoofInfo>>,
pub bids: RwLock<HashMap<PermissionKey, Vec<SimulatedBid>>>,
}

#[derive(Default)]
Expand Down Expand Up @@ -169,38 +171,27 @@ pub enum BidStatus {
Lost,
}

impl BidStatus {
pub fn status_name(&self) -> String {
match self {
BidStatus::Pending => "pending".to_string(),
BidStatus::Submitted(_) => "submitted".to_string(),
BidStatus::Lost => "lost".to_string(),
}
}
}

#[derive(Serialize, Clone, ToSchema, ToResponse)]
pub struct BidStatusWithId {
#[schema(value_type = String)]
pub id: BidId,
pub bid_status: BidStatus,
}

pub struct BidStatusStore {
pub bids_status: RwLock<HashMap<BidId, BidStatus>>,
pub event_sender: broadcast::Sender<UpdateEvent>,
}

impl BidStatusStore {
pub async fn get_status(&self, id: &BidId) -> Option<BidStatus> {
self.bids_status.read().await.get(id).cloned()
}

pub async fn set_and_broadcast(&self, update: BidStatusWithId) {
self.bids_status
.write()
.await
.insert(update.id, update.bid_status.clone());
match self.event_sender.send(UpdateEvent::BidStatusUpdate(update)) {
Ok(_) => (),
Err(e) => tracing::error!("Failed to send bid status update: {}", e),
};
}
}

pub struct Store {
pub chains: HashMap<ChainId, ChainStore>,
pub bid_status_store: BidStatusStore,
pub bids: RwLock<HashMap<BidId, SimulatedBid>>,
pub event_sender: broadcast::Sender<UpdateEvent>,
pub opportunity_store: OpportunityStore,
pub relayer: LocalWallet,
pub ws: WsState,
Expand Down Expand Up @@ -280,35 +271,77 @@ impl Store {
) -> Result<(), RestError> {
let bid_id = bid.id;
let now = OffsetDateTime::now_utc();
sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, target_contract, target_calldata, bid_amount, status) VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending')",
sqlx::query!("INSERT INTO bid (id, creation_time, permission_key, chain_id, target_contract, target_calldata, bid_amount, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
bid.id,
PrimitiveDateTime::new(now.date(), now.time()),
permission_key.to_vec(),
chain_id,
&bid.target_contract.to_fixed_bytes(),
bid.target_calldata.to_vec(),
BigDecimal::from_str(&bid.bid_amount.to_string()).unwrap())
BigDecimal::from_str(&bid.bid_amount.to_string()).unwrap(),
bid.status.status_name(),
)
.execute(&self.db)
.await.map_err(|e| {
tracing::error!("DB: Failed to insert bid: {}", e);
RestError::TemporarilyUnavailable
})?;

self.chains
.get(chain_id)
.expect("chain exists")
.bids
.write()
self.bids.write().await.insert(bid_id, bid.clone());
self.broadcast_status_update(BidStatusWithId {
id: bid_id,
bid_status: bid.status.clone(),
});
Ok(())
}

pub async fn set_bid_status_and_broadcast(
&self,
update: BidStatusWithId,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE bid SET status = $1 WHERE id = $2",
update.bid_status.status_name(),
update.id
)
.execute(&self.db)
.await?;


self.bids.write().await.get_mut(&update.id).map(|bid| {
bid.status = update.bid_status.clone();
});
self.broadcast_status_update(update);
Ok(())
}

fn broadcast_status_update(&self, update: BidStatusWithId) {
match self.event_sender.send(UpdateEvent::BidStatusUpdate(update)) {
Ok(_) => (),
Err(e) => tracing::error!("Failed to send bid status update: {}", e),
};
}

pub async fn get_bids_by_chain_id(&self, chain_id: &ChainId) -> Vec<SimulatedBid> {
self.bids
.read()
.await
.entry(permission_key)
.or_default()
.push(bid);
self.bid_status_store
.set_and_broadcast(BidStatusWithId {
id: bid_id,
bid_status: BidStatus::Pending,
})
.await;
.values()
.filter(|bid| bid.chain_id.eq(chain_id))
.cloned()
.collect()
}

pub async fn remove_bid(&self, bid_id: &BidId) -> anyhow::Result<()> {
let now = OffsetDateTime::now_utc();
sqlx::query!(
"UPDATE bid SET removal_time = $1 WHERE id = $2 AND removal_time IS NULL",
PrimitiveDateTime::new(now.date(), now.time()),
bid_id
)
.execute(&self.db)
.await?;
self.bids.write().await.remove(bid_id);
Ok(())
}
}

0 comments on commit 540397e

Please sign in to comment.