Skip to content

Commit

Permalink
refactor: needless cancellation token cloning (#6618)
Browse files Browse the repository at this point in the history
The solution we ended up for `backoff::retry` requires always cloning of
cancellation tokens even though there is just `.await`. Fix that, and
also turn the return type into `Option<Result<T, E>>` avoiding the need
for the `E::cancelled()` fn passed in.

Cc: #6096
  • Loading branch information
koivunej authored Feb 6, 2024
1 parent 8e114bd commit 9471657
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 156 deletions.
4 changes: 3 additions & 1 deletion control_plane/attachment_service/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,11 @@ impl ComputeHook {
3,
10,
"Send compute notification",
backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown),
cancel,
)
.await
.ok_or_else(|| NotifyError::ShuttingDown)
.and_then(|x| x)
}

/// Call this to notify the compute (postgres) tier of new pageservers to use
Expand Down
2 changes: 1 addition & 1 deletion libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl RemoteStorage for AzureBlobStorage {
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: CancellationToken,
_cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
// TODO use Azure point in time recovery feature for this
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Expand Down
4 changes: 2 additions & 2 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError>;
}

Expand Down Expand Up @@ -442,7 +442,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
match self {
Self::LocalFs(s) => {
Expand Down
2 changes: 1 addition & 1 deletion libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl RemoteStorage for LocalFs {
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: CancellationToken,
_cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}
Expand Down
14 changes: 9 additions & 5 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl RemoteStorage for S3Bucket {
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let _guard = self.permit(kind).await;
Expand Down Expand Up @@ -678,9 +678,11 @@ impl RemoteStorage for S3Bucket {
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
cancel,
)
.await?;
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;

tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
Expand Down Expand Up @@ -805,9 +807,11 @@ impl RemoteStorage for S3Bucket {
warn_threshold,
max_retries,
"copying object version for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
cancel,
)
.await?;
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::info!(%version_id, %key, "Copied old version in S3");
}
VerOrDelete {
Expand Down
2 changes: 1 addition & 1 deletion libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl RemoteStorage for UnreliableWrapper {
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
Expand Down
11 changes: 7 additions & 4 deletions libs/remote_storage/tests/test_real_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
warn_threshold,
max_retries,
"test retry",
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
&CancellationToken::new(),
)
.await
.expect("never cancelled")
}

async fn time_point() -> SystemTime {
Expand All @@ -76,6 +77,8 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
.collect::<HashSet<_>>())
}

let cancel = CancellationToken::new();

let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;

Expand Down Expand Up @@ -142,7 +145,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, CancellationToken::new())
.time_travel_recover(None, t2, t_final, &cancel)
.await?;
let t2_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
Expand All @@ -153,7 +156,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, CancellationToken::new())
.time_travel_recover(None, t1, t_final, &cancel)
.await?;
let t1_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
Expand All @@ -164,7 +167,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, CancellationToken::new())
.time_travel_recover(None, t0, t_final, &cancel)
.await?;
let t0_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t0: {t0_files_recovered:?}");
Expand Down
90 changes: 37 additions & 53 deletions libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,69 +37,53 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
}
}

/// Configure cancellation for a retried operation: when to cancel (the token), and
/// what kind of error to return on cancellation
pub struct Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
token: CancellationToken,
on_cancel: CF,
}

impl<E, CF> Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
pub fn new(token: CancellationToken, on_cancel: CF) -> Self {
Self { token, on_cancel }
}
}

/// retries passed operation until one of the following conditions are met:
/// Encountered error is considered as permanent (non-retryable)
/// Retries have been exhausted.
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors
/// When attempts cross `warn_threshold` function starts to emit log warnings.
/// Retries passed operation until one of the following conditions are met:
/// - encountered error is considered as permanent (non-retryable)
/// - retries have been exhausted
/// - cancellation token has been cancelled
///
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
/// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
/// `description` argument is added to log messages. Its value should identify the `op` is doing
/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken
/// to drop out promptly on shutdown.
pub async fn retry<T, O, F, E, CF>(
/// `cancel` cancels new attempts and the backoff sleep.
///
/// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
/// for any other error type. Final failed attempt is logged with `{:?}`.
///
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
pub async fn retry<T, O, F, E>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: Cancel<E, CF>,
) -> Result<T, E>
cancel: &CancellationToken,
) -> Option<Result<T, E>>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
CF: Fn() -> E,
{
let mut attempts = 0;
loop {
if cancel.token.is_cancelled() {
return Err((cancel.on_cancel)());
if cancel.is_cancelled() {
return None;
}

let result = op().await;
match result {
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("{description} succeeded after {attempts} retries");
}
return result;
return Some(result);
}

// These are "permanent" errors that should not be retried.
Err(ref e) if is_permanent(e) => {
return result;
Err(e) if is_permanent(e) => {
return Some(result);
}
// Assume that any other failure might be transient, and the operation might
// succeed if we just keep trying.
Expand All @@ -109,20 +93,20 @@ where
Err(err) if attempts < max_retries => {
tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(ref err) => {
Err(err) => {
// Operation failed `max_attempts` times. Time to give up.
tracing::warn!(
"{description} still failed after {attempts} retries, giving up: {err:?}"
);
return result;
return Some(result);
}
}
// sleep and retry
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel.token,
cancel,
)
.await;
attempts += 1;
Expand All @@ -131,12 +115,10 @@ where

#[cfg(test)]
mod tests {
use super::*;
use std::io;

use tokio::sync::Mutex;

use super::*;

#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;
Expand Down Expand Up @@ -166,7 +148,7 @@ mod tests {
#[tokio::test(start_paused = true)]
async fn retry_always_error() {
let count = Mutex::new(0);
let err_result = retry(
retry(
|| async {
*count.lock().await += 1;
Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
Expand All @@ -175,11 +157,11 @@ mod tests {
1,
1,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
&CancellationToken::new(),
)
.await;

assert!(err_result.is_err());
.await
.expect("not cancelled")
.expect_err("it can only fail");

assert_eq!(*count.lock().await, 2);
}
Expand All @@ -201,10 +183,11 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
&CancellationToken::new(),
)
.await
.unwrap();
.expect("not cancelled")
.expect("success on second try");
}

#[tokio::test(start_paused = true)]
Expand All @@ -224,10 +207,11 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
&CancellationToken::new(),
)
.await
.unwrap_err();
.expect("was not cancellation")
.expect_err("it was permanent error");

assert_eq!(*count.lock().await, 1);
}
Expand Down
Loading

1 comment on commit 9471657

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2462 tests run: 2341 passed, 1 failed, 120 skipped (full report)


Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_lazy_startup"
Flaky tests (1)

Postgres 15

Code coverage (full report)

  • functions: 54.5% (11316 of 20781 functions)
  • lines: 81.5% (63660 of 78113 lines)

The comment gets automatically updated with the latest test results
9471657 at 2024-02-06T08:32:57.303Z :recycle:

Please sign in to comment.