Skip to content

Commit

Permalink
returns mempool verification errors early, and fixes handling for can…
Browse files Browse the repository at this point in the history
…cellations or timeouts.
  • Loading branch information
arya2 committed Nov 22, 2024
1 parent 5e642b5 commit 3dc3c1c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
22 changes: 11 additions & 11 deletions zebra-consensus/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,17 @@ where
)?;

if let Some(mut mempool) = mempool {
if !transaction.transaction.transaction.outputs().is_empty() {
tokio::spawn(async move {
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}
tokio::spawn(async move {
// Best-effort poll of the mempool to provide a timely response to
// `sendrawtransaction` RPC calls or `AwaitOutput` mempool calls.
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}

Response::Mempool { transaction, spent_mempool_outpoints }
Expand Down
6 changes: 0 additions & 6 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,6 @@ impl Service<Request> for Mempool {
metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);

storage.reject_if_needed(tx_id, error);

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ =
rsp_tx.send(Err("timeout waiting for verification result".into()));
}
}
Err(elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
Expand Down
37 changes: 30 additions & 7 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ where
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();

Expand Down Expand Up @@ -402,11 +402,11 @@ where
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
.map_err(move |e| (e, txid))
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.in_current_span();

let task = tokio::spawn(async move {
Expand All @@ -418,9 +418,32 @@ where
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("verification cancelled".into()));
}

Ok(Err((TransactionDownloadVerifyError::Cancelled, txid)))
}
verification = fut => verification,
verification = fut => {
verification
.inspect_err(|_elapsed| {
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
}
})
.inspect(|inner_result| {
let _ = inner_result
.as_ref()
.inspect_err(|(tx_verifier_error, tx_id)| {
if let Some(rsp_tx) = rsp_tx.take() {
let error_msg = format!(
"failed to validate tx: {tx_id}, error: {tx_verifier_error}"
);
let _ = rsp_tx.send(Err(error_msg.into()));
}
});
})
},
};

(result, rsp_tx)
Expand Down

0 comments on commit 3dc3c1c

Please sign in to comment.