Skip to content

Commit

Permalink
feat: added retry to querying. Retry on everything, remove is critical
Browse files Browse the repository at this point in the history
  • Loading branch information
akorchyn committed Oct 25, 2024
1 parent 3b59ff1 commit 67219a0
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 103 deletions.
85 changes: 72 additions & 13 deletions src/common/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use near_primitives::{
use serde::de::DeserializeOwned;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::{config::NetworkConfig, errors::QueryError, types::Data};
use crate::{common::utils::retry, config::NetworkConfig, errors::QueryError, types::Data};

const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";

Expand Down Expand Up @@ -102,13 +102,16 @@ where
reference: Reference,
requests: Vec<Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>>,
handler: ResponseHandler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> MultiRpcBuilder<Handler, Method, Reference>
where
Handler: ResponseHandler<QueryResponse = Method::Response, Method = Method> + Send + Sync,
Method: RpcMethod + Send + Sync + 'static,
Method::Response: Send + Sync,
Method: RpcMethod + std::fmt::Debug + Send + Sync + 'static,
Method::Response: std::fmt::Debug + Send + Sync,
Method::Error: std::fmt::Display + std::fmt::Debug + Sync + Send,
Reference: Clone + Send + Sync,
{
Expand All @@ -117,6 +120,10 @@ where
reference,
requests: vec![],
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand Down Expand Up @@ -155,9 +162,27 @@ where
.collect::<Result<_, _>>()?;

info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
let requests = requests
.into_iter()
.map(|request| json_rpc_client.call(request));
let requests = requests.into_iter().map(|query| {
let json_rpc_client = json_rpc_client.clone();
async move {
retry(
|| async {
let result = json_rpc_client.call(&query).await;
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
);
result
},
self.retries,
self.sleep_duration,
self.exponential_backoff,
)
.await
}
});

let requests: Vec<_> = join_all(requests)
.await
Expand Down Expand Up @@ -187,14 +212,16 @@ pub struct RpcBuilder<Handler, Method, Reference> {
reference: Reference,
request: Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>,
handler: Handler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> RpcBuilder<Handler, Method, Reference>
where
Handler: ResponseHandler<QueryResponse = Method::Response, Method = Method> + Send + Sync,
Method: RpcMethod + Send + Sync + 'static,
Method::Response: Send + Sync,
Method: RpcMethod + 'static,
Method: RpcMethod + std::fmt::Debug + Send + Sync + 'static,
Method::Response: std::fmt::Debug + Send + Sync,
Method::Error: std::fmt::Display + std::fmt::Debug + Sync + Send,
Reference: Send + Sync,
{
Expand All @@ -207,25 +234,57 @@ where
reference,
request: Arc::new(request),
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

pub fn at(self, reference: Reference) -> Self {
Self { reference, ..self }
}

pub const fn with_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}

pub const fn with_sleep_duration(mut self, sleep_duration: std::time::Duration) -> Self {
self.sleep_duration = sleep_duration;
self
}

pub const fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

#[instrument(skip(self, network))]
pub async fn fetch_from(
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Method> {
let json_rpc_client = network.json_rpc_client();

debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
let json_rpc_client = network.json_rpc_client();
let query = self.request.create_query(network, self.reference)?;

info!(target: QUERY_EXECUTOR_TARGET, "Sending query");
let query_response = json_rpc_client.call(query).await?;
let query_response = retry(
|| async {
let result = json_rpc_client.call(&query).await;
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
);
result
},
3,
std::time::Duration::from_secs(1),
false,
)
.await?;

debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
self.handler.process_response(vec![query_response])
Expand Down
124 changes: 34 additions & 90 deletions src/common/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::{
views::FinalExecutionOutcomeView,
};
use reqwest::Response;
use tracing::{debug, error, info, warn};
use tracing::{debug, info};

use crate::{
config::NetworkConfig,
Expand All @@ -22,7 +22,8 @@ use crate::{
};

use super::{
signed_delegate_action::SignedDelegateActionAsBase64, META_TRANSACTION_VALID_FOR_DEFAULT,
signed_delegate_action::SignedDelegateActionAsBase64, utils::retry,
META_TRANSACTION_VALID_FOR_DEFAULT,
};

const TX_EXECUTOR_TARGET: &str = "near_api::tx::executor";
Expand Down Expand Up @@ -81,6 +82,7 @@ pub struct ExecuteSignedTransaction {
pub signer: Arc<Signer>,
pub retries: u8,
pub sleep_duration: std::time::Duration,
pub exponential_backoff: bool,
}

impl ExecuteSignedTransaction {
Expand All @@ -89,7 +91,9 @@ impl ExecuteSignedTransaction {
tr: TransactionableOrSigned::Transactionable(Box::new(tr)),
signer,
retries: 5,
sleep_duration: std::time::Duration::from_secs(1),
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand All @@ -107,6 +111,11 @@ impl ExecuteSignedTransaction {
self
}

pub fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

pub async fn presign_offline(
mut self,
public_key: PublicKey,
Expand Down Expand Up @@ -226,58 +235,35 @@ impl ExecuteSignedTransaction {
retries: u8,
sleep_duration: std::time::Duration,
) -> Result<FinalExecutionOutcomeView, ExecuteTransactionError> {
let mut retries = (1..=retries).rev();
let transaction_info = loop {
debug!(
target: TX_EXECUTOR_TARGET,
"Attempting to broadcast transaction. Retries left: {}",
retries.len()
);
let transaction_info_result = network
retry(
|| {
let signed_tr = signed_tr.clone();
async move {
let result = network
.json_rpc_client()
.call(
near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest {
signed_transaction: signed_tr.clone(),
},
)
.await;
match transaction_info_result {
Ok(response) => {
info!(
},
)
.await;

tracing::debug!(
target: TX_EXECUTOR_TARGET,
"Transaction successfully broadcasted. Transaction hash: {:?}",
response.transaction.hash
"Broadcasting transaction {} resulted in {:?}",
signed_tr.get_hash(),
result
);
break response;
}
Err(err) => {
if is_critical_error(&err) {
error!(
target: TX_EXECUTOR_TARGET,
"Critical error encountered while broadcasting transaction: {:?}",
err
);
return Err(ExecuteTransactionError::CriticalTransactionError(err));
} else if retries.next().is_some() {
warn!(
target: TX_EXECUTOR_TARGET,
"Error encountered while broadcasting transaction. Retrying in {:?}. Error: {:?}",
sleep_duration,
err
);
tokio::time::sleep(sleep_duration).await;
} else {
error!(
target: TX_EXECUTOR_TARGET,
"All retries exhausted. Failed to broadcast transaction: {:?}",
err
);
return Err(ExecuteTransactionError::RetriesExhausted(err));
}

result
}
};
};
Ok(transaction_info)
},
retries,
sleep_duration,
false,
)
.await
.map_err(ExecuteTransactionError::RetriesExhausted)
}
}

Expand Down Expand Up @@ -472,45 +458,3 @@ impl ExecuteMetaTransaction {
Ok(resp)
}
}

pub const fn is_critical_error(
err: &near_jsonrpc_client::errors::JsonRpcError<
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError,
>,
) -> bool {
match err {
near_jsonrpc_client::errors::JsonRpcError::TransportError(_rpc_transport_error) => {
false
}
near_jsonrpc_client::errors::JsonRpcError::ServerError(rpc_server_error) => match rpc_server_error {
near_jsonrpc_client::errors::JsonRpcServerError::HandlerError(rpc_transaction_error) => match rpc_transaction_error {
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::TimeoutError => {
false
}
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::InvalidTransaction { .. } |
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::DoesNotTrackShard |
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::RequestRouted{..} |
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::UnknownTransaction{..} |
near_jsonrpc_client::methods::broadcast_tx_commit::RpcTransactionError::InternalError{..} => {
true
}
}
near_jsonrpc_client::errors::JsonRpcServerError::RequestValidationError(..) |
near_jsonrpc_client::errors::JsonRpcServerError::NonContextualError(..) => {
true
}
near_jsonrpc_client::errors::JsonRpcServerError::InternalError{ .. } => {
false
}
near_jsonrpc_client::errors::JsonRpcServerError::ResponseStatusError(json_rpc_server_response_status_error) => match json_rpc_server_response_status_error {
near_jsonrpc_client::errors::JsonRpcServerResponseStatusError::Unauthorized |
near_jsonrpc_client::errors::JsonRpcServerResponseStatusError::Unexpected{..} => {
true
}
near_jsonrpc_client::errors::JsonRpcServerResponseStatusError::TooManyRequests => {
false
}
}
}
}
}
29 changes: 29 additions & 0 deletions src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,35 @@ pub fn parse_decimal_number(s: &str, pref_const: u128) -> Result<u128, DecimalNu
Ok(result)
}

pub(crate) async fn retry<R, E, T, F>(
mut task: F,
retries: u8,
initial_sleep: std::time::Duration,
exponential_backoff: bool,
) -> T::Output
where
F: FnMut() -> T + Send,
T: core::future::Future<Output = core::result::Result<R, E>> + Send,
{
let mut retries = (1..=retries).rev();
let mut sleep_duration = initial_sleep;
loop {
let result = task().await;
match result {
Ok(result) => return Ok(result),
Err(_) if retries.next().is_some() => {
tokio::time::sleep(sleep_duration).await;
sleep_duration = if exponential_backoff {
sleep_duration * 2
} else {
sleep_duration
};
}
Err(err) => return Err(err),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ pub enum ExecuteTransactionError {
PreQueryError(#[from] QueryError<RpcQueryRequest>),
#[error("Retries exhausted. The last error is: {0}")]
RetriesExhausted(JsonRpcError<RpcTransactionError>),
#[deprecated(since = "0.2.1", note = "unused")]
#[error("Transaction error: {0}")]
CriticalTransactionError(JsonRpcError<RpcTransactionError>),
#[error(transparent)]
Expand Down

0 comments on commit 67219a0

Please sign in to comment.