Skip to content

Commit

Permalink
refactor: Replace all calls to backoff::future::retry with ::retry_no…
Browse files Browse the repository at this point in the history
…tify to be able to log how long we'll wait until next retry
  • Loading branch information
binarybaron committed Dec 4, 2024
1 parent 4625f94 commit 8f2b4d4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
52 changes: 32 additions & 20 deletions swap/src/asb/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,28 +717,40 @@ impl EventLoopHandle {

let transfer_proof = self.build_transfer_proof_request(msg);

backoff::future::retry(backoff, || async {
// Create a oneshot channel to receive the acknowledgment of the transfer proof
let (singular_sender, singular_receiver) = oneshot::channel();

if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender)) {
let err = anyhow!(err).context("Failed to communicate transfer proof through event loop channel");
tracing::error!(%err, swap_id = %self.swap_id, "Failed to send transfer proof");
return Err(backoff::Error::permanent(err));
}

match singular_receiver.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to send transfer proof due to a network error. We will retry");
Err(backoff::Error::transient(anyhow!(err)))
backoff::future::retry_notify(
backoff,
|| async {
// Create a oneshot channel to receive the acknowledgment of the transfer proof
let (singular_sender, singular_receiver) = oneshot::channel();

if let Err(err) = sender.send((self.peer, transfer_proof.clone(), singular_sender))
{
return Err(backoff::Error::permanent(err.context(
"Failed to communicate transfer proof through event loop channel",
)));
}
Err(_) => {
Err(backoff::Error::permanent(anyhow!("The sender channel should never be closed without sending a response")))

match singular_receiver.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(backoff::Error::transient(
anyhow!(err)
.context("A network error occurred while sending the transfer proof"),
)),
Err(_) => Err(backoff::Error::permanent(anyhow!(
"The sender channel should never be closed without sending a response"
))),
}
}
})
.await?;
},
|e, wait_time: Duration| {
tracing::warn!(
swap_id = %self.swap_id,
error = ?e,
"Failed to send transfer proof. We will retry in {} seconds",
wait_time.as_secs_f64()
)
},
)
.await?;

self.transfer_proof_sender.take();

Expand Down
44 changes: 32 additions & 12 deletions swap/src/cli/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,12 @@ impl EventLoopHandle {

let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT);

backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.execution_setup_sender.send_receive(swap.clone()).await {
Ok(Ok(state2)) => Ok(state2),
// These are errors thrown by the swap_setup/bob behaviour
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to setup swap. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap")))
}
// This will happen if we don't establish a connection to Alice within the timeout of the MPSC channel
// The protocol does not dial Alice it self
Expand All @@ -424,6 +423,12 @@ impl EventLoopHandle {
unreachable!("We never drop the receiver of the execution setup channel, so this should never happen")
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to setup swap. We will retry in {} seconds",
wait_time.as_secs_f64()
)
})
.await
.context("Failed to setup swap after retries")
Expand All @@ -448,17 +453,22 @@ impl EventLoopHandle {

let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);

backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.quote_sender.send_receive(()).await {
Ok(Ok(quote)) => Ok(quote),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to request quote due to network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote")))
}
Err(_) => {
unreachable!("We initiate the quote channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to request quote. We will retry in {} seconds",
wait_time.as_secs_f64()
)
})
.await
.context("Failed to request quote after retries")
Expand All @@ -469,17 +479,22 @@ impl EventLoopHandle {

let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);

backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.cooperative_xmr_redeem_sender.send_receive(()).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to request cooperative XMR redeem due to network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem")))
}
Err(_) => {
unreachable!("We initiate the cooperative xmr redeem channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to request cooperative XMR redeem. We will retry in {} seconds",
wait_time.as_secs_f64()
)
})
.await
.context("Failed to request cooperative XMR redeem after retries")
Expand All @@ -497,17 +512,22 @@ impl EventLoopHandle {
.with_max_interval(REQUEST_RESPONSE_PROTOCOL_TIMEOUT)
.build();

backoff::future::retry(backoff, || async {
backoff::future::retry_notify(backoff, || async {
match self.encrypted_signature_sender.send_receive(tx_redeem_encsig.clone()).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => {
tracing::warn!(%err, "Failed to send encrypted signature due to a network error. Will retry");
Err(backoff::Error::transient(err))
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature")))
}
Err(_) => {
unreachable!("We initiate the encrypted signature channel without a timeout and store both the sender and receiver in the same struct, so this should never happen");
}
}
}, |err, wait_time: Duration| {
tracing::warn!(
error = ?err,
"Failed to send encrypted signature. We will retry in {} seconds",
wait_time.as_secs_f64()
)
})
.await
.context("Failed to send encrypted signature after retries")
Expand Down

0 comments on commit 8f2b4d4

Please sign in to comment.