Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix svm slow conclusion #292

Merged
merged 5 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion auction-server/src/auction/service/auction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ impl AuctionManager<Svm> for Service<Svm> {
return Ok(vec![]);
}

//TODO: this can be optimized out if triggered by websocket events
let signatures: Vec<_> = bids
.iter()
.map(|bid| {
Expand Down
90 changes: 53 additions & 37 deletions auction-server/src/auction/service/conclude_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,62 @@ pub struct ConcludeAuctionInput<T: ChainTrait> {
pub auction: entities::Auction<T>,
}

pub struct ConcludeAuctionWithStatusesInput<T: ChainTrait> {
pub auction: entities::Auction<T>,
pub bid_statuses: Vec<(T::BidStatusType, entities::Bid<T>)>,
}

impl<T: ChainTrait> Service<T>
where
Service<T>: AuctionManager<T>,
{
#[tracing::instrument(skip_all, fields(auction_id, tx_hash, bid_ids, bid_statuses))]
pub async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(auction_id, bid_ids, bid_statuses))]
pub async fn conclude_auction_with_statuses(
&self,
input: ConcludeAuctionWithStatusesInput<T>,
) -> anyhow::Result<()> {
let mut auction = input.auction;
tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction");
tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&auction.bids)),
);
tracing::Span::current().record("auction_id", auction.id.to_string());
tracing::Span::current().record("bid_statuses", format!("{:?}", input.bid_statuses));
join_all(input.bid_statuses.into_iter().map(|(status, bid)| {
self.update_bid_status(UpdateBidStatusInput {
bid: bid.clone(),
new_status: status.clone(),
})
}))
.await;

if self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await
.is_empty()
{
self.repo
.conclude_auction(&mut auction)
.await
.map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?;
self.repo.remove_in_memory_submitted_auction(auction).await;
}

Ok(())
}

/// Concludes an auction by getting the auction transaction status from the chain.
#[tracing::instrument(skip_all)]
pub async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
let auction = input.auction;
tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction");
if let Some(tx_hash) = auction.tx_hash.clone() {
tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash));
let bids = self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await;

tracing::Span::current().record(
"bid_ids",
tracing::field::display(entities::BidContainerTracing(&bids)),
);
let bid_statuses = self
.get_bid_results(
bids.clone(),
Expand All @@ -45,35 +81,15 @@ where
)
.await?;

join_all(
bid_statuses
.iter()
.zip(bids.iter())
.filter_map(|(status, bid)| {
status.as_ref().map(|status| {
self.update_bid_status(UpdateBidStatusInput {
bid: bid.clone(),
new_status: status.clone(),
})
})
}),
)
.await;


if self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await
.is_empty()
{
tracing::Span::current().record("bid_statuses", format!("{:?}", bid_statuses));
self.repo
.conclude_auction(&mut auction)
.await
.map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?;
self.repo.remove_in_memory_submitted_auction(auction).await;
}
self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput {
auction,
bid_statuses: bid_statuses
.into_iter()
.zip(bids)
.filter_map(|(status, bid)| status.map(|status| (status, bid)))
.collect(),
})
.await?;
}
Ok(())
}
Expand Down
89 changes: 71 additions & 18 deletions auction-server/src/auction/service/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use {
api::ws::UpdateEvent,
auction::{
api::SvmChainUpdate,
service::conclude_auction::ConcludeAuctionInput,
entities,
service::conclude_auction::ConcludeAuctionWithStatusesInput,
},
kernel::entities::{
Evm,
Expand All @@ -25,9 +26,13 @@ use {
},
axum_prometheus::metrics,
ethers::providers::Middleware,
solana_client::rpc_config::{
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
futures::future::join_all,
solana_client::{
rpc_config::{
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::RpcLogsResponse,
},
solana_sdk::{
commitment_config::CommitmentConfig,
Expand Down Expand Up @@ -132,6 +137,53 @@ impl Service<Evm> {
const GET_LATEST_BLOCKHASH_INTERVAL_SVM: Duration = Duration::from_secs(5);

impl Service<Svm> {
pub async fn conclude_auction_for_log(
danimhr marked this conversation as resolved.
Show resolved Hide resolved
&self,
auction: entities::Auction<Svm>,
log: RpcLogsResponse,
) -> Result<()> {
let signature = Signature::from_str(&log.signature)?;
let submitted_bids = self
.repo
.get_in_memory_submitted_bids_for_auction(&auction)
.await;
if let Some(bid) = submitted_bids
.iter()
.find(|bid| bid.chain_data.transaction.signatures[0] == signature)
{
let bid_status = match log.err {
Some(_) => entities::BidStatusSvm::Failed {
auction: entities::BidStatusAuction {
id: auction.id,
tx_hash: signature,
},
},
None => entities::BidStatusSvm::Won {
auction: entities::BidStatusAuction {
id: auction.id,
tx_hash: signature,
},
},
};

self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput {
auction: auction.clone(),
bid_statuses: vec![(bid_status, bid.clone())],
})
.await
.map_err(|e| {
tracing::error!(
error = ?e,
auction_id = ?auction.id,
tx_hash = ?signature,
"Failed to conclude auction with statuses"
);
e
})?;
}
Ok(())
}

pub async fn run_auction_conclusion_loop(&self) -> Result<()> {
tracing::info!(
chain_id = self.config.chain_id,
Expand All @@ -151,21 +203,22 @@ impl Service<Svm> {
log = ?rpc_log.clone(),
"New log trigger received",
);
if let Ok(signature) = Signature::from_str(&rpc_log.value.signature){
self.task_tracker.spawn({
let service = self.clone();
async move {
let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await;
if let Some(auction) = submitted_auctions.iter().find(|auction| {
auction.bids.iter().any(|bid| bid.chain_data.transaction.signatures[0] == signature)
}) {
if let Err(err) = service.conclude_auction(ConcludeAuctionInput{auction: auction.clone()}).await {
tracing::error!(error = ?err, auction = ?auction, "Error while concluding submitted auction");
}
if let Ok(signature) = Signature::from_str(&rpc_log.value.signature) {
self.task_tracker.spawn({
let service = self.clone();
async move {
let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await;
let auctions = submitted_auctions.iter().filter(|auction| {
auction.bids.iter().any(|bid| {
bid.chain_data.transaction.signatures[0] == signature
})
});
join_all(
auctions.map(|auction| service.conclude_auction_for_log(auction.clone(), rpc_log.value.clone()))
).await;
}
}
});
}
});
}
}
}
}
Expand Down
Loading