From 308bc3cd0074de70c10950b4fa954b6245d077f9 Mon Sep 17 00:00:00 2001 From: Binarybaron Date: Wed, 4 Dec 2024 13:02:20 +0100 Subject: [PATCH] refactor: Replace all calls to backoff::future::retry with ::retry_notify to be able to log how long we'll wait until next retry --- swap/src/asb/event_loop.rs | 52 +++++++++++++++++++++++--------------- swap/src/cli/event_loop.rs | 44 +++++++++++++++++++++++--------- 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index 4fc45b21e..2fc1b553b 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -717,28 +717,40 @@ impl EventLoopHandle { let transfer_proof = self.build_transfer_proof_request(msg); - backoff::future::retry(backoff, || async { - // Create a oneshot channel to receive the acknowledgment of the transfer proof - let (singular_sender, singular_receiver) = oneshot::channel(); - - if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender)) { - let err = anyhow!(err).context("Failed to communicate transfer proof through event loop channel"); - tracing::error!(%err, swap_id = %self.swap_id, "Failed to send transfer proof"); - return Err(backoff::Error::permanent(err)); - } - - match singular_receiver.await { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => { - tracing::warn!(%err, "Failed to send transfer proof due to a network error. We will retry"); - Err(backoff::Error::transient(anyhow!(err))) + backoff::future::retry_notify( + backoff, + || async { + // Create a oneshot channel to receive the acknowledgment of the transfer proof + let (singular_sender, singular_receiver) = oneshot::channel(); + + if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender)) + { + return Err(backoff::Error::permanent(anyhow!(err).context( + "Failed to communicate transfer proof through event loop channel", + ))); } - Err(_) => { - Err(backoff::Error::permanent(anyhow!("The sender channel should never be closed without sending a response"))) + + match singular_receiver.await { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(backoff::Error::transient( + anyhow!(err) + .context("A network error occurred while sending the transfer proof"), + )), + Err(_) => Err(backoff::Error::permanent(anyhow!( + "The sender channel should never be closed without sending a response" + ))), } - } - }) - .await?; + }, + |e, wait_time: Duration| { + tracing::warn!( + swap_id = %self.swap_id, + error = ?e, + "Failed to send transfer proof. We will retry in {} seconds", + wait_time.as_secs_f64() + ) + }, + ) + .await?; self.transfer_proof_sender.take(); diff --git a/swap/src/cli/event_loop.rs b/swap/src/cli/event_loop.rs index 2b6076805..83b4164a0 100644 --- a/swap/src/cli/event_loop.rs +++ b/swap/src/cli/event_loop.rs @@ -406,13 +406,12 @@ impl EventLoopHandle { let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT); - backoff::future::retry(backoff, || async { + backoff::future::retry_notify(backoff, || async { match self.execution_setup_sender.send_receive(swap.clone()).await { Ok(Ok(state2)) => Ok(state2), // These are errors thrown by the swap_setup/bob behaviour Ok(Err(err)) => { - tracing::warn!(%err, "Failed to setup swap. Will retry"); - Err(backoff::Error::transient(err)) + Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap"))) } // This will happen if we don't establish a connection to Alice within the timeout of the MPSC channel // The protocol does not dial Alice it self @@ -424,6 +423,12 @@ impl EventLoopHandle { unreachable!("We never drop the receiver of the execution setup channel, so this should never happen") } } + }, |err, wait_time: Duration| { + tracing::warn!( + error = ?err, + "Failed to setup swap. We will retry in {} seconds", + wait_time.as_secs_f64() + ) }) .await .context("Failed to setup swap after retries") @@ -448,17 +453,22 @@ impl EventLoopHandle { let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); - backoff::future::retry(backoff, || async { + backoff::future::retry_notify(backoff, || async { match self.quote_sender.send_receive(()).await { Ok(Ok(quote)) => Ok(quote), Ok(Err(err)) => { - tracing::warn!(%err, "Failed to request quote due to network error. Will retry"); - Err(backoff::Error::transient(err)) + Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote"))) } Err(_) => { unreachable!("We initiate the quote channel without a timeout and store both the sender and receiver in the same struct, so this should never happen"); } } + }, |err, wait_time: Duration| { + tracing::warn!( + error = ?err, + "Failed to request quote. We will retry in {} seconds", + wait_time.as_secs_f64() + ) }) .await .context("Failed to request quote after retries") @@ -469,17 +479,22 @@ impl EventLoopHandle { let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); - backoff::future::retry(backoff, || async { + backoff::future::retry_notify(backoff, || async { match self.cooperative_xmr_redeem_sender.send_receive(()).await { Ok(Ok(response)) => Ok(response), Ok(Err(err)) => { - tracing::warn!(%err, "Failed to request cooperative XMR redeem due to network error. Will retry"); - Err(backoff::Error::transient(err)) + Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem"))) } Err(_) => { unreachable!("We initiate the cooperative xmr redeem channel without a timeout and store both the sender and receiver in the same struct, so this should never happen"); } } + }, |err, wait_time: Duration| { + tracing::warn!( + error = ?err, + "Failed to request cooperative XMR redeem. We will retry in {} seconds", + wait_time.as_secs_f64() + ) }) .await .context("Failed to request cooperative XMR redeem after retries") @@ -497,17 +512,22 @@ impl EventLoopHandle { .with_max_interval(REQUEST_RESPONSE_PROTOCOL_TIMEOUT) .build(); - backoff::future::retry(backoff, || async { + backoff::future::retry_notify(backoff, || async { match self.encrypted_signature_sender.send_receive(tx_redeem_encsig.clone()).await { Ok(Ok(_)) => Ok(()), Ok(Err(err)) => { - tracing::warn!(%err, "Failed to send encrypted signature due to a network error. Will retry"); - Err(backoff::Error::transient(err)) + Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature"))) } Err(_) => { unreachable!("We initiate the encrypted signature channel without a timeout and store both the sender and receiver in the same struct, so this should never happen"); } } + }, |err, wait_time: Duration| { + tracing::warn!( + error = ?err, + "Failed to send encrypted signature. We will retry in {} seconds", + wait_time.as_secs_f64() + ) }) .await .context("Failed to send encrypted signature after retries")