diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 4eb8580e32cfe..f687b24320ceb 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -259,7 +259,7 @@ pub(crate) enum BlobDataParseResult { Incorrect(Vec), } -fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> { +pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> { match name.rsplit_once('-') { // FIXME: this is gross, just use a regex? Some((layer_filename, gen)) if gen.len() == 8 => { diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs new file mode 100644 index 0000000000000..24668b65169a3 --- /dev/null +++ b/storage_scrubber/src/find_large_objects.rs @@ -0,0 +1,97 @@ +use futures::StreamExt; +use pageserver::tenant::storage_layer::LayerName; +use serde::{Deserialize, Serialize}; + +use crate::{ + checks::parse_layer_object_name, init_remote, list_objects_with_retries, + metadata_stream::stream_tenants, BucketConfig, NodeKind, +}; + +#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +enum LargeObjectKind { + DeltaLayer, + ImageLayer, + Other, +} + +impl LargeObjectKind { + fn from_key(key: &str) -> Self { + let fname = key.split('/').last().unwrap(); + + let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else { + return LargeObjectKind::Other; + }; + + match layer_name { + LayerName::Image(_) => LargeObjectKind::ImageLayer, + LayerName::Delta(_) => LargeObjectKind::DeltaLayer, + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct LargeObject { + pub key: String, + pub size: u64, + kind: LargeObjectKind, +} + +#[derive(Serialize, Deserialize)] +pub struct LargeObjectListing { + pub objects: Vec, +} + +pub async fn find_large_objects( + bucket_config: BucketConfig, + min_size: u64, + ignore_deltas: bool, +) -> anyhow::Result { + let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); + let mut objects = Vec::new(); + let mut tenant_ctr = 0u64; + let mut object_ctr = 0u64; + while let Some(tenant_shard_id) = tenants.next().await { + let tenant_shard_id = tenant_shard_id?; + let mut tenant_root = target.tenant_root(&tenant_shard_id); + // We want the objects and not just common prefixes + tenant_root.delimiter.clear(); + let mut continuation_token = None; + loop { + let fetch_response = + list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) + .await?; + for obj in fetch_response.contents().iter().filter(|o| { + if let Some(obj_size) = o.size { + min_size as i64 <= obj_size + } else { + false + } + }) { + let key = obj.key().expect("couldn't get key").to_owned(); + let kind = LargeObjectKind::from_key(&key); + if ignore_deltas && kind == LargeObjectKind::DeltaLayer { + continue; + } + objects.push(LargeObject { + key, + size: obj.size.unwrap() as u64, + kind, + }) + } + object_ctr += fetch_response.contents().len() as u64; + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + + tenant_ctr += 1; + if tenant_ctr % 50 == 0 { + tracing::info!( + "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len() + ); + } + } + Ok(LargeObjectListing { objects }) +} diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 64273432fc0cc..6adaa5d38f6b0 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -2,6 +2,7 @@ #![deny(clippy::undocumented_unsafe_blocks)] pub mod checks; pub mod cloud_admin_api; +pub mod find_large_objects; pub mod garbage; pub mod metadata_stream; pub mod pageserver_physical_gc; diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 222bd10ed2484..10699edd3c948 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -1,6 +1,7 @@ use anyhow::bail; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; +use storage_scrubber::find_large_objects; use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use storage_scrubber::pageserver_physical_gc::GcMode; use storage_scrubber::scan_pageserver_metadata::scan_metadata; @@ -72,6 +73,12 @@ enum Command { #[arg(short, long, default_value_t = GcMode::IndicesOnly)] mode: GcMode, }, + FindLargeObjects { + #[arg(long = "min-size")] + min_size: u64, + #[arg(short, long, default_value_t = false)] + ignore_deltas: bool, + }, } #[tokio::main] @@ -86,6 +93,7 @@ async fn main() -> anyhow::Result<()> { Command::PurgeGarbage { .. } => "purge-garbage", Command::TenantSnapshot { .. } => "tenant-snapshot", Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc", + Command::FindLargeObjects { .. } => "find-large-objects", }; let _guard = init_logging(&format!( "{}_{}_{}_{}.log", @@ -199,5 +207,15 @@ async fn main() -> anyhow::Result<()> { println!("{}", serde_json::to_string(&summary).unwrap()); Ok(()) } + Command::FindLargeObjects { + min_size, + ignore_deltas, + } => { + let summary = + find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas) + .await?; + println!("{}", serde_json::to_string(&summary).unwrap()); + Ok(()) + } } }