Skip to content

Commit

Permalink
fix: Limit retries when read resumes but stream keeps erroring (#66)
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 19, 2024
1 parent 921883f commit 363cb80
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use http::uri::Authority;
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tokio::time::sleep;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};

use crate::{
Expand Down Expand Up @@ -256,6 +257,10 @@ pub struct ClientConfig {
/// URI scheme to use to connect.
#[cfg(feature = "connector")]
pub uri_scheme: http::uri::Scheme,
/// Backoff duration for retries.
pub retry_backoff_duration: Duration,
/// Maximum number of retries.
pub max_retries: usize,
}

impl ClientConfig {
Expand All @@ -270,6 +275,8 @@ impl ClientConfig {
user_agent: "s2-sdk-rust".to_string(),
#[cfg(feature = "connector")]
uri_scheme: http::uri::Scheme::HTTPS,
retry_backoff_duration: Duration::from_millis(100),
max_retries: 3,
}
}

Expand Down Expand Up @@ -314,6 +321,22 @@ impl ClientConfig {
..self
}
}

/// Construct from an existing configuration with the retry backoff duration.
pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
Self {
retry_backoff_duration: retry_backoff_duration.into(),
..self
}
}

/// Construct from an existing configuration with maximum number of retries.
pub fn with_max_retries(self, max_retries: usize) -> Self {
Self {
max_retries,
..self
}
}
}

#[derive(Debug, Clone, thiserror::Error)]
Expand Down Expand Up @@ -728,14 +751,15 @@ impl ClientInner {
&self,
service_req: T,
) -> Result<T::Response, ClientError> {
self.send_retryable_with_backoff(
service_req,
ConstantBuilder::default()
.with_delay(Duration::from_millis(100))
.with_max_times(3)
.with_jitter(),
)
.await
self.send_retryable_with_backoff(service_req, self.backoff_builder())
.await
}

fn backoff_builder(&self) -> impl BackoffBuilder {
ConstantBuilder::default()
.with_delay(self.config.retry_backoff_duration)
.with_max_times(self.config.max_retries)
.with_jitter()
}

fn account_service_client(&self) -> AccountServiceClient<Channel> {
Expand Down Expand Up @@ -765,17 +789,29 @@ fn read_resumption_stream(
mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
client: ClientInner,
) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
let mut backoff = None;
async_stream::stream! {
while let Some(item) = responses.next().await {
match item {
Err(e) if request.should_retry(&e) => {
if let Ok(new_responses) = client.send_retryable(request.clone()).await {
responses = new_responses;
if backoff.is_none() {
backoff = Some(client.backoff_builder().build());
}
if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
sleep(duration).await;
if let Ok(new_responses) = client.send_retryable(request.clone()).await {
responses = new_responses;
} else {
yield Err(e);
}
} else {
yield Err(e);
}
}
item => {
if item.is_ok() {
backoff = None;
}
if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
if let Some(record) = records.last() {
request.set_start_seq_num(Some(record.seq_num + 1));
Expand Down

0 comments on commit 363cb80

Please sign in to comment.