From 9b6af2bcaddad49f59809c69717c23ced725dfa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 18 Nov 2024 22:01:48 +0100 Subject: [PATCH] Add the ability to configure GenericRemoteStorage for the scrubber (#9652) Earlier work (#7547) has made the scrubber internally generic, but one could only configure it to use S3 storage. This is the final piece to make (most of, snapshotting still requires S3) the scrubber be able to be configured via GenericRemoteStorage. I.e. you can now set an env var like: ``` REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "neon-dev-safekeeper-us-east-2d", bucket_region = "us-east-2" } ``` and the scrubber will read it instead. --- Cargo.lock | 1 - libs/remote_storage/src/config.rs | 21 ++- pageserver/ctl/Cargo.toml | 1 - pageserver/ctl/src/main.rs | 6 +- storage_scrubber/src/find_large_objects.rs | 4 +- storage_scrubber/src/garbage.rs | 4 +- storage_scrubber/src/lib.rs | 127 +++++++++++------- storage_scrubber/src/main.rs | 18 +-- .../src/scan_safekeeper_metadata.rs | 5 +- storage_scrubber/src/tenant_snapshot.rs | 14 +- 10 files changed, 115 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da8cefb219ff..c7af140f7df2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,6 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "toml_edit", "utils", "workspace_hack", ] diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index d0e92411dabe..e99ae4f747d9 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -26,6 +26,16 @@ pub struct RemoteStorageConfig { pub timeout: Duration, } +impl RemoteStorageKind { + pub fn bucket_name(&self) -> Option<&str> { + match self { + RemoteStorageKind::LocalFs { .. } => None, + RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name), + RemoteStorageKind::AzureContainer(config) => Some(&config.container_name), + } + } +} + fn default_timeout() -> Duration { RemoteStorageConfig::DEFAULT_TIMEOUT } @@ -178,6 +188,14 @@ impl RemoteStorageConfig { pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { Ok(utils::toml_edit_ext::deserialize_item(toml)?) } + + pub fn from_toml_str(input: &str) -> anyhow::Result { + let toml_document = toml_edit::DocumentMut::from_str(input)?; + if let Some(item) = toml_document.get("remote_storage") { + return Self::from_toml(item); + } + Self::from_toml(toml_document.as_item()) + } } #[cfg(test)] @@ -185,8 +203,7 @@ mod tests { use super::*; fn parse(input: &str) -> anyhow::Result { - let toml = input.parse::().unwrap(); - RemoteStorageConfig::from_toml(toml.as_item()) + RemoteStorageConfig::from_toml_str(input) } #[test] diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index a753f806a076..39ca47568cef 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -18,7 +18,6 @@ postgres_ffi.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true -toml_edit.workspace = true utils.workspace = true svg_fmt.workspace = true workspace_hack.workspace = true diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 92e766d2fbbe..a0aac89dc834 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -174,11 +174,7 @@ async fn main() -> anyhow::Result<()> { println!("specified prefix '{}' failed validation", cmd.prefix); return Ok(()); }; - let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?; - let toml_item = toml_document - .get("remote_storage") - .expect("need remote_storage"); - let config = RemoteStorageConfig::from_toml(toml_item)?; + let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?; let storage = remote_storage::GenericRemoteStorage::from_config(&config).await; let cancel = CancellationToken::new(); storage diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 88e36af56008..95d3af145323 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -106,9 +106,9 @@ pub async fn find_large_objects( } } - let bucket_name = target.bucket_name(); + let desc_str = target.desc_str(); tracing::info!( - "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", + "Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", objects.len() ); Ok(LargeObjectListing { objects }) diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 863dbf960da4..91668a42a705 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -177,7 +177,7 @@ async fn find_garbage_inner( })); // Enumerate Tenants in S3, and check if each one exists in Console - tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); + tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); let tenants = stream_tenants(&remote_client, &target); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); @@ -524,7 +524,7 @@ pub async fn purge_garbage( init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; assert_eq!( - &garbage_list.bucket_config.bucket, + garbage_list.bucket_config.bucket_name().unwrap(), remote_client.bucket_name().unwrap() ); diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index de0857cb5f0b..1fe4fc58cd0e 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -29,8 +29,7 @@ use pageserver::tenant::TENANTS_SEGMENT_NAME; use pageserver_api::shard::TenantShardId; use remote_storage::{ DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, - RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + RemoteStorageKind, S3Config, }; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -48,7 +47,7 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; #[derive(Debug, Clone)] pub struct S3Target { - pub bucket_name: String, + pub desc_str: String, /// This `prefix_in_bucket` is only equal to the PS/SK config of the same /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket /// with extra parts. @@ -172,7 +171,7 @@ impl RootTarget { }; S3Target { - bucket_name: root.bucket_name.clone(), + desc_str: root.desc_str.clone(), prefix_in_bucket: format!( "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}", root.prefix_in_bucket @@ -209,10 +208,10 @@ impl RootTarget { } } - pub fn bucket_name(&self) -> &str { + pub fn desc_str(&self) -> &str { match self { - Self::Pageserver(root) => &root.bucket_name, - Self::Safekeeper(root) => &root.bucket_name, + Self::Pageserver(root) => &root.desc_str, + Self::Safekeeper(root) => &root.desc_str, } } @@ -230,26 +229,63 @@ pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct BucketConfig { - pub region: String, - pub bucket: String, - pub prefix_in_bucket: Option, -} +pub struct BucketConfig(RemoteStorageConfig); impl BucketConfig { pub fn from_env() -> anyhow::Result { - let region = env::var("REGION").context("'REGION' param retrieval")?; - let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; - let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); + if let Ok(legacy) = Self::from_env_legacy() { + return Ok(legacy); + } + let config_toml = + env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?; + let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?; + Ok(BucketConfig(remote_config)) + } - Ok(Self { - region, - bucket, - prefix_in_bucket, - }) + fn from_env_legacy() -> anyhow::Result { + let bucket_region = env::var("REGION").context("'REGION' param retrieval")?; + let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?; + let prefix_in_bucket = env::var("BUCKET_PREFIX").ok(); + let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + // Create a json object which we then deserialize so that we don't + // have to repeat all of the S3Config fields. + let s3_config_json = serde_json::json!({ + "bucket_name": bucket_name, + "bucket_region": bucket_region, + "prefix_in_bucket": prefix_in_bucket, + "endpoint": endpoint, + }); + let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?; + Ok(BucketConfig(config)) + } + pub fn desc_str(&self) -> String { + match &self.0.storage { + RemoteStorageKind::LocalFs { local_path } => { + format!("local path {local_path}") + } + RemoteStorageKind::AwsS3(config) => format!( + "bucket {}, region {}", + config.bucket_name, config.bucket_region + ), + RemoteStorageKind::AzureContainer(config) => format!( + "bucket {}, storage account {:?}, region {}", + config.container_name, config.storage_account, config.container_region + ), + } + } + pub fn bucket_name(&self) -> Option<&str> { + self.0.storage.bucket_name() } } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BucketConfigLegacy { + pub region: String, + pub bucket: String, + pub prefix_in_bucket: Option, +} + pub struct ControllerClientConfig { /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local` pub controller_api: Url, @@ -337,13 +373,9 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str { } } -fn make_root_target( - bucket_name: String, - prefix_in_bucket: String, - node_kind: NodeKind, -) -> RootTarget { +fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget { let s3_target = S3Target { - bucket_name, + desc_str, prefix_in_bucket, delimiter: "/".to_string(), }; @@ -354,15 +386,15 @@ fn make_root_target( } async fn init_remote_s3( - bucket_config: BucketConfig, + bucket_config: S3Config, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { - let bucket_region = Region::new(bucket_config.region); + let bucket_region = Region::new(bucket_config.bucket_region); let s3_client = Arc::new(init_s3_client(bucket_region).await); let default_prefix = default_prefix_in_bucket(node_kind).to_string(); let s3_root = make_root_target( - bucket_config.bucket, + bucket_config.bucket_name, bucket_config.prefix_in_bucket.unwrap_or(default_prefix), node_kind, ); @@ -371,33 +403,28 @@ async fn init_remote_s3( } async fn init_remote( - bucket_config: BucketConfig, + mut storage_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> { - let endpoint = env::var("AWS_ENDPOINT_URL").ok(); + let desc_str = storage_config.desc_str(); + let default_prefix = default_prefix_in_bucket(node_kind).to_string(); - let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix)); - let storage = S3Config { - bucket_name: bucket_config.bucket.clone(), - bucket_region: bucket_config.region, - prefix_in_bucket, - endpoint, - concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT - .try_into() - .unwrap(), - max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - upload_storage_class: None, - }; - let storage_config = RemoteStorageConfig { - storage: RemoteStorageKind::AwsS3(storage), - timeout: RemoteStorageConfig::DEFAULT_TIMEOUT, - }; + + match &mut storage_config.0.storage { + RemoteStorageKind::AwsS3(ref mut config) => { + config.prefix_in_bucket.get_or_insert(default_prefix); + } + RemoteStorageKind::AzureContainer(ref mut config) => { + config.prefix_in_container.get_or_insert(default_prefix); + } + RemoteStorageKind::LocalFs { .. } => (), + } // We already pass the prefix to the remote client above let prefix_in_root_target = String::new(); - let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind); + let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind); - let client = GenericRemoteStorage::from_config(&storage_config).await?; + let client = GenericRemoteStorage::from_config(&storage_config.0).await?; Ok((client, root_target)) } @@ -469,7 +496,7 @@ async fn list_objects_with_retries( } warn!( "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", - s3_target.bucket_name, + remote_client.bucket_name().unwrap_or_default(), s3_target.prefix_in_bucket, s3_target.delimiter, DisplayErrorContext(e), diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index ee816534c63b..0ffb57098444 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> { "{}_{}_{}_{}.log", std::env::args().next().unwrap(), command_log_name, - bucket_config.bucket, + bucket_config.bucket_name().unwrap_or("nobucket"), chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") )); @@ -191,13 +191,7 @@ async fn main() -> anyhow::Result<()> { // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - bail!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + bail!("No timelines found in {}", bucket_config.desc_str()); } Ok(()) } else { @@ -396,13 +390,7 @@ pub async fn scan_pageserver_metadata_cmd( // Strictly speaking an empty bucket is a valid bucket, but if someone ran the // scrubber they were likely expecting to scan something, and if we see no timelines // at all then it's likely due to some configuration issues like a bad prefix - tracing::error!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - ); + tracing::error!("No timelines found in {}", bucket_config.desc_str()); if exit_code { std::process::exit(1); } diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 403b4590a825..0a4d4266a09d 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -84,10 +84,7 @@ pub async fn scan_safekeeper_metadata( bucket_config: BucketConfig, db_or_list: DatabaseOrList, ) -> anyhow::Result { - info!( - "checking bucket {}, region {}", - bucket_config.bucket, bucket_config.region - ); + info!("checking {}", bucket_config.desc_str()); let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index bb4079b5f470..39e0b5c9b4e1 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -16,7 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::shard::TenantShardId; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, S3Config}; use utils::generation::Generation; use utils::id::TenantId; @@ -24,6 +24,7 @@ pub struct SnapshotDownloader { s3_client: Arc, s3_root: RootTarget, bucket_config: BucketConfig, + bucket_config_s3: S3Config, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, @@ -36,12 +37,17 @@ impl SnapshotDownloader { output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { + let bucket_config_s3 = match &bucket_config.0.storage { + remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(), + _ => panic!("only S3 configuration is supported for snapshot downloading"), + }; let (s3_client, s3_root) = - init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?; + init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, bucket_config, + bucket_config_s3, tenant_id, output_path, concurrency, @@ -87,7 +93,7 @@ impl SnapshotDownloader { let versions = self .s3_client .list_object_versions() - .bucket(self.bucket_config.bucket.clone()) + .bucket(self.bucket_config_s3.bucket_name.clone()) .prefix(&remote_layer_path) .send() .await?; @@ -96,7 +102,7 @@ impl SnapshotDownloader { }; download_object_to_file_s3( &self.s3_client, - &self.bucket_config.bucket, + &self.bucket_config_s3.bucket_name, &remote_layer_path, version.version_id.as_deref(), &local_path,