Skip to content

Commit

Permalink
Fix svm slow conclusion (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
danimhr authored Dec 14, 2024
1 parent 3f2af65 commit a63eb19
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 56 deletions.
1 change: 0 additions & 1 deletion auction-server/src/auction/service/auction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ impl AuctionManager<Svm> for Service<Svm> {
return Ok(vec![]);
}

//TODO: this can be optimized out if triggered by websocket events
let signatures: Vec<_> = bids
.iter()
.map(|bid| {
Expand Down
90 changes: 53 additions & 37 deletions auction-server/src/auction/service/conclude_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,62 @@ pub struct ConcludeAuctionInput<T: ChainTrait> {
pub auction: entities::Auction<T>,
}

pub struct ConcludeAuctionWithStatusesInput<T: ChainTrait> {
pub auction: entities::Auction<T>,
pub bid_statuses: Vec<(T::BidStatusType, entities::Bid<T>)>,
}

impl<T: ChainTrait> Service<T>
where
Service<T>: AuctionManager<T>,
{
#[tracing::instrument(skip_all, fields(auction_id, tx_hash, bid_ids, bid_statuses))]
pub async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(auction_id, bid_ids, bid_statuses))]
pub async fn conclude_auction_with_statuses(
&self,
input: ConcludeAuctionWithStatusesInput<T>,
) -> anyhow::Result<()> {
let mut auction = input.auction;
tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction");
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&auction.bids)),
);
tracing::Span::current().record("auction_id", auction.id.to_string());
tracing::Span::current().record("bid_statuses", format!("{:?}", input.bid_statuses));
join_all(input.bid_statuses.into_iter().map(|(status, bid)| {
self.update_bid_status(UpdateBidStatusInput {
bid: bid.clone(),
new_status: status.clone(),
})
}))
.await;

if self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await
.is_empty()
{
self.repo
.conclude_auction(&mut auction)
.await
.map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?;
self.repo.remove_in_memory_submitted_auction(auction).await;
}

Ok(())
}

/// Concludes an auction by getting the auction transaction status from the chain.
#[tracing::instrument(skip_all)]
pub async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
let auction = input.auction;
tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction");
if let Some(tx_hash) = auction.tx_hash.clone() {
tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash));
let bids = self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await;

tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&bids)),
);
let bid_statuses = self
.get_bid_results(
bids.clone(),
Expand All @@ -45,35 +81,15 @@ where
)
.await?;

join_all(
bid_statuses
.iter()
.zip(bids.iter())
.filter_map(|(status, bid)| {
status.as_ref().map(|status| {
self.update_bid_status(UpdateBidStatusInput {
bid: bid.clone(),
new_status: status.clone(),
})
})
}),
)
.await;


if self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await
.is_empty()
{
tracing::Span::current().record("bid_statuses", format!("{:?}", bid_statuses));
self.repo
.conclude_auction(&mut auction)
.await
.map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?;
self.repo.remove_in_memory_submitted_auction(auction).await;
}
self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput {
auction,
bid_statuses: bid_statuses
.into_iter()
.zip(bids)
.filter_map(|(status, bid)| status.map(|status| (status, bid)))
.collect(),
})
.await?;
}
Ok(())
}
Expand Down
89 changes: 71 additions & 18 deletions auction-server/src/auction/service/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use {
api::ws::UpdateEvent,
auction::{
api::SvmChainUpdate,
service::conclude_auction::ConcludeAuctionInput,
entities,
service::conclude_auction::ConcludeAuctionWithStatusesInput,
},
kernel::entities::{
Evm,
Expand All @@ -25,9 +26,13 @@ use {
},
axum_prometheus::metrics,
ethers::providers::Middleware,
solana_client::rpc_config::{
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
futures::future::join_all,
solana_client::{
rpc_config::{
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::RpcLogsResponse,
},
solana_sdk::{
commitment_config::CommitmentConfig,
Expand Down Expand Up @@ -132,6 +137,53 @@ impl Service<Evm> {
const GET_LATEST_BLOCKHASH_INTERVAL_SVM: Duration = Duration::from_secs(5);

impl Service<Svm> {
pub async fn conclude_auction_for_log(
&self,
auction: entities::Auction<Svm>,
log: RpcLogsResponse,
) -> Result<()> {
let signature = Signature::from_str(&log.signature)?;
let submitted_bids = self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await;
if let Some(bid) = submitted_bids
.iter()
.find(|bid| bid.chain_data.transaction.signatures[0] == signature)
{
let bid_status = match log.err {
Some(_) => entities::BidStatusSvm::Failed {
auction: entities::BidStatusAuction {
id: auction.id,
tx_hash: signature,
},
},
None => entities::BidStatusSvm::Won {
auction: entities::BidStatusAuction {
id: auction.id,
tx_hash: signature,
},
},
};

self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput {
auction: auction.clone(),
bid_statuses: vec![(bid_status, bid.clone())],
})
.await
.map_err(|e| {
tracing::error!(
error = ?e,
auction_id = ?auction.id,
tx_hash = ?signature,
"Failed to conclude auction with statuses"
);
e
})?;
}
Ok(())
}

pub async fn run_auction_conclusion_loop(&self) -> Result<()> {
tracing::info!(
chain_id = self.config.chain_id,
Expand All @@ -151,21 +203,22 @@ impl Service<Svm> {
log = ?rpc_log.clone(),
"New log trigger received",
);
if let Ok(signature) = Signature::from_str(&rpc_log.value.signature){
self.task_tracker.spawn({
let service = self.clone();
async move {
let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await;
if let Some(auction) = submitted_auctions.iter().find(|auction| {
auction.bids.iter().any(|bid| bid.chain_data.transaction.signatures[0] == signature)
}) {
if let Err(err) = service.conclude_auction(ConcludeAuctionInput{auction: auction.clone()}).await {
tracing::error!(error = ?err, auction = ?auction, "Error while concluding submitted auction");
}
if let Ok(signature) = Signature::from_str(&rpc_log.value.signature) {
self.task_tracker.spawn({
let service = self.clone();
async move {
let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await;
let auctions = submitted_auctions.iter().filter(|auction| {
auction.bids.iter().any(|bid| {
bid.chain_data.transaction.signatures[0] == signature
})
});
join_all(
auctions.map(|auction| service.conclude_auction_for_log(auction.clone(), rpc_log.value.clone()))
).await;
}
}
});
}
});
}
}
}
}
Expand Down

0 comments on commit a63eb19

Please sign in to comment.