Skip to content

Commit

Permalink
fix(storage-scrubber): use default AWS authentication (#8299)
Browse files Browse the repository at this point in the history
part of neondatabase/cloud#14024
close #7665

Things running in k8s container use this authentication:
https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html
while we did not configure the client to use it. This pull request
simply uses the default s3 client credential chain for storage scrubber.
It might break compatibility with minio.

## Summary of changes

* Use default AWS credential provider chain.
* Improvements for s3 errors, we now have detailed errors and correct
backtrace on last trial of the operation.

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Joonas Koivunen <[email protected]>
  • Loading branch information
skyzh and koivunej authored Jul 9, 2024
1 parent 4a5b55c commit b1fe825
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 77 deletions.
2 changes: 1 addition & 1 deletion storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn find_large_objects(
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));

let objects_stream = tenants.map_ok(|tenant_shard_id| {
Expand Down
4 changes: 2 additions & 2 deletions storage_scrubber/src/garbage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));

// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
Expand Down Expand Up @@ -432,7 +432,7 @@ pub async fn purge_garbage(
);

let (s3_client, target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;

// Sanity checks on the incoming list
if garbage_list.active_tenant_count == 0 {
Expand Down
89 changes: 22 additions & 67 deletions storage_scrubber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use anyhow::{anyhow, Context};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;

use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
Expand Down Expand Up @@ -274,65 +267,21 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
}

pub fn init_s3_client(bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
let chain = CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder().build(),
);

// Use SSO if we were given an account ID
match std::env::var("SSO_ACCOUNT_ID").ok() {
Some(sso_account) => chain.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(sso_account)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(bucket_region.clone())
.build(),
),
None => chain,
}
.or_else(
// Finally try IMDS
"imds",
ImdsCredentialsProvider::builder().build(),
)
};

let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());

let mut builder = Config::builder()
.behavior_version(
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
BehaviorVersion::v2023_11_09(),
)
pub async fn init_s3_client(bucket_region: Region) -> Client {
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);

if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
builder = builder.endpoint_url(endpoint)
}

Client::from_conf(builder.build())
.load()
.await;
Client::new(&config)
}

fn init_remote(
async fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_region));
let s3_client = Arc::new(init_s3_client(bucket_region).await);

let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
Expand All @@ -357,7 +306,7 @@ async fn list_objects_with_retries(
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for _ in 0..MAX_RETRIES {
for trial in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
Expand All @@ -369,16 +318,22 @@ async fn list_objects_with_retries(
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
}

async fn download_object_with_retries(
Expand Down
2 changes: 1 addition & 1 deletion storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let downloader =
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
downloader.download().await
}
Command::PageserverPhysicalGc {
Expand Down
2 changes: 1 addition & 1 deletion storage_scrubber/src/pageserver_physical_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc(
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;

let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
Expand Down
2 changes: 1 addition & 1 deletion storage_scrubber/src/scan_pageserver_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;

let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
Expand Down
2 changes: 1 addition & 1 deletion storage_scrubber/src/scan_safekeeper_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());

let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);

Expand Down
7 changes: 4 additions & 3 deletions storage_scrubber/src/tenant_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ pub struct SnapshotDownloader {
}

impl SnapshotDownloader {
pub fn new(
pub async fn new(
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self {
s3_client,
s3_root,
Expand Down Expand Up @@ -215,7 +215,8 @@ impl SnapshotDownloader {
}

pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
let (s3_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;

// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
Expand Down

1 comment on commit b1fe825

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3129 tests run: 3002 passed, 0 failed, 127 skipped (full report)


Flaky tests (1)

Postgres 16

  • test_isolation[None]: debug

Code coverage* (full report)

  • functions: 32.6% (6940 of 21288 functions)
  • lines: 50.0% (54551 of 109048 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
b1fe825 at 2024-07-09T19:05:16.861Z :recycle:

Please sign in to comment.