Skip to content

Commit

Permalink
prepare to move timeouts and cancellation handling to remote_storage (#…
Browse files Browse the repository at this point in the history
…6696)

This PR is preliminary cleanups and refactoring around `remote_storage`
for next PR which will move the timeouts and cancellation into
`remote_storage`.

Summary:
- smaller drive-by fixes
- code simplification
- refactor common parts like `DownloadError::is_permanent`
- align error types with `RemoteStorage::list_*` to use more
`download_retry` helper

Cc: #6096
  • Loading branch information
koivunej authored Feb 9, 2024
1 parent eec1e1a commit eb919ca
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 143 deletions.
26 changes: 23 additions & 3 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod azure_blob;
mod local_fs;
mod s3_bucket;
mod simulate_failures;
mod support;

use std::{
collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime,
Expand Down Expand Up @@ -170,7 +171,10 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details.
async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
async fn list_files(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
Ok(result)
}
Expand All @@ -179,7 +183,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError>;
) -> Result<Listing, DownloadError>;

/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
Expand Down Expand Up @@ -269,6 +273,19 @@ impl std::fmt::Display for DownloadError {

impl std::error::Error for DownloadError {}

impl DownloadError {
/// Returns true if the error should not be retried with backoff
pub fn is_permanent(&self) -> bool {
use DownloadError::*;
match self {
BadInput(_) => true,
NotFound => true,
Cancelled => true,
Other(_) => false,
}
}
}

#[derive(Debug)]
pub enum TimeTravelError {
/// Validation or other error happened due to user input.
Expand Down Expand Up @@ -336,7 +353,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
pub async fn list_files(
&self,
folder: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Expand Down
50 changes: 34 additions & 16 deletions libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};

use crate::{
Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError,
};
use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError};

use super::{RemoteStorage, StorageMetadata};

Expand Down Expand Up @@ -365,27 +363,33 @@ impl RemoteStorage for LocalFs {
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?;

let len = source
.metadata()
.await
.context("query file length")
.map_err(DownloadError::Other)?
.len();

source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;

let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;

let download_stream: DownloadStream = match end_exclusive {
Some(end_exclusive) => Box::pin(ReaderStream::new(
source.take(end_exclusive - start_inclusive),
)),
None => Box::pin(ReaderStream::new(source)),
};
let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
let source = ReaderStream::new(source);

Ok(Download {
metadata,
last_modified: None,
etag: None,
download_stream,
download_stream: Box::pin(source),
})
} else {
Err(DownloadError::NotFound)
Expand Down Expand Up @@ -514,10 +518,8 @@ mod fs_tests {
use futures_util::Stream;
use std::{collections::HashMap, io::Write};

async fn read_and_assert_remote_file_contents(
async fn read_and_check_metadata(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &RemotePath,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
Expand Down Expand Up @@ -596,7 +598,7 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;

let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
Expand All @@ -618,7 +620,7 @@ mod fs_tests {
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;

let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
read_and_check_metadata(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
Expand Down Expand Up @@ -660,6 +662,22 @@ mod fs_tests {
"Second part bytes should be returned when requested"
);

let suffix_bytes = storage
.download_byte_range(&upload_target, 13, None)
.await?
.download_stream;
let suffix_bytes = aggregate(suffix_bytes).await?;
let suffix = std::str::from_utf8(&suffix_bytes)?;
assert_eq!(upload_name, suffix);

let all_bytes = storage
.download_byte_range(&upload_target, 0, None)
.await?
.download_stream;
let all_bytes = aggregate(all_bytes).await?;
let all_bytes = std::str::from_utf8(&all_bytes)?;
assert_eq!(dummy_contents("upload_1"), all_bytes);

Ok(())
}

Expand Down Expand Up @@ -736,7 +754,7 @@ mod fs_tests {
upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;

let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
Expand Down
77 changes: 25 additions & 52 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ use utils::backoff;

use super::StorageMetadata;
use crate::{
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode,
RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};

pub(super) mod metrics;
Expand All @@ -63,7 +64,6 @@ pub struct S3Bucket {
concurrency_limiter: ConcurrencyLimiter,
}

#[derive(Default)]
struct GetObjectRequest {
bucket: String,
key: String,
Expand Down Expand Up @@ -232,24 +232,8 @@ impl S3Bucket {

let started_at = ScopeGuard::into_inner(started_at);

match get_object {
Ok(object_output) => {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
let etag = object_output.e_tag.clone();
let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());

let body = object_output.body;
let body = ByteStreamAsStream::from(body);
let body = PermitCarrying::new(permit, body);
let body = TimedDownload::new(started_at, body);

Ok(Download {
metadata,
etag,
last_modified,
download_stream: Box::pin(body),
})
}
let object_output = match get_object {
Ok(object_output) => object_output,
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
Expand All @@ -259,7 +243,7 @@ impl S3Bucket {
AttemptOutcome::Ok,
started_at,
);
Err(DownloadError::NotFound)
return Err(DownloadError::NotFound);
}
Err(e) => {
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
Expand All @@ -268,11 +252,27 @@ impl S3Bucket {
started_at,
);

Err(DownloadError::Other(
return Err(DownloadError::Other(
anyhow::Error::new(e).context("download s3 object"),
))
));
}
}
};

let metadata = object_output.metadata().cloned().map(StorageMetadata);
let etag = object_output.e_tag;
let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());

let body = object_output.body;
let body = ByteStreamAsStream::from(body);
let body = PermitCarrying::new(permit, body);
let body = TimedDownload::new(started_at, body);

Ok(Download {
metadata,
etag,
last_modified,
download_stream: Box::pin(body),
})
}

async fn delete_oids(
Expand Down Expand Up @@ -354,33 +354,6 @@ impl Stream for ByteStreamAsStream {
// sense and Stream::size_hint does not really
}

pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct PermitCarrying<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}

impl<S> PermitCarrying<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
Self { permit, inner }
}
}

impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for PermitCarrying<S> {
type Item = <S as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

pin_project_lite::pin_project! {
/// Times and tracks the outcome of the request.
struct TimedDownload<S> {
Expand Down
28 changes: 18 additions & 10 deletions libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl UnreliableWrapper {
/// On the first attempts of this operation, return an error. After 'attempts_to_fail'
/// attempts, let the operation go ahead, and clear the counter.
///
fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
let mut attempts = self.attempts.lock().unwrap();

match attempts.entry(op) {
Expand All @@ -78,13 +78,13 @@ impl UnreliableWrapper {
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
Err(DownloadError::Other(error))
Err(error)
}
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
e.insert(1);
Err(DownloadError::Other(error))
Err(error)
}
}
}
Expand All @@ -105,12 +105,17 @@ impl RemoteStorage for UnreliableWrapper {
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list_prefixes(prefix).await
}

async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
async fn list_files(
&self,
folder: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list_files(folder).await
}

Expand All @@ -119,7 +124,8 @@ impl RemoteStorage for UnreliableWrapper {
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner.list(prefix, mode).await
}

Expand All @@ -137,7 +143,8 @@ impl RemoteStorage for UnreliableWrapper {
}

async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
self.attempt(RemoteOp::Download(from.clone()))?;
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner.download(from).await
}

Expand All @@ -150,7 +157,8 @@ impl RemoteStorage for UnreliableWrapper {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
// for now.
self.attempt(RemoteOp::Download(from.clone()))?;
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive)
.await
Expand Down Expand Up @@ -193,7 +201,7 @@ impl RemoteStorage for UnreliableWrapper {
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
Expand Down
Loading

1 comment on commit eb919ca

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2493 tests run: 2363 passed, 6 failed, 124 skipped (full report)


Failures on Postgres 16

  • test_pageserver_lsn_wait_error_start: debug

Failures on Postgres 14

  • test_pgbench_intensive_init_workload[neon_off-1000]: release
  • test_heavy_write_workload[neon_off-10-5-5]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pgbench_intensive_init_workload[neon_off-1000] or test_heavy_write_workload[neon_off-10-5-5] or test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30] or test_pageserver_lsn_wait_error_start[debug-pg16]"
Flaky tests (5)

Postgres 16

  • test_compute_pageserver_connection_stress: release
  • test_pg_regress[None]: debug
  • test_tenant_delete_smoke: release

Postgres 15

  • test_crafted_wal_end[last_wal_record_crossing_segment]: release

Postgres 14

  • test_physical_replication: release

Test coverage report is not available

The comment gets automatically updated with the latest test results
eb919ca at 2024-02-09T14:09:41.802Z :recycle:

Please sign in to comment.