diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 24668b65169a..1422545f2fce 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -1,4 +1,4 @@ -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::storage_layer::LayerName; use serde::{Deserialize, Serialize}; @@ -29,7 +29,7 @@ impl LargeObjectKind { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct LargeObject { pub key: String, pub size: u64, @@ -45,53 +45,76 @@ pub async fn find_large_objects( bucket_config: BucketConfig, min_size: u64, ignore_deltas: bool, + concurrency: usize, ) -> anyhow::Result { let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; - let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); - let mut objects = Vec::new(); - let mut tenant_ctr = 0u64; - let mut object_ctr = 0u64; - while let Some(tenant_shard_id) = tenants.next().await { - let tenant_shard_id = tenant_shard_id?; + let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); + + let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - // We want the objects and not just common prefixes - tenant_root.delimiter.clear(); - let mut continuation_token = None; - loop { - let fetch_response = - list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) - .await?; - for obj in fetch_response.contents().iter().filter(|o| { - if let Some(obj_size) = o.size { - min_size as i64 <= obj_size - } else { - false + let s3_client = s3_client.clone(); + async move { + let mut objects = Vec::new(); + let mut total_objects_ctr = 0u64; + // We want the objects and not just common prefixes + tenant_root.delimiter.clear(); + let mut continuation_token = None; + loop { + let fetch_response = + list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) + .await?; + for obj in fetch_response.contents().iter().filter(|o| { + if let Some(obj_size) = o.size { + min_size as i64 <= obj_size + } else { + false + } + }) { + let key = obj.key().expect("couldn't get key").to_owned(); + let kind = LargeObjectKind::from_key(&key); + if ignore_deltas && kind == LargeObjectKind::DeltaLayer { + continue; + } + objects.push(LargeObject { + key, + size: obj.size.unwrap() as u64, + kind, + }) } - }) { - let key = obj.key().expect("couldn't get key").to_owned(); - let kind = LargeObjectKind::from_key(&key); - if ignore_deltas && kind == LargeObjectKind::DeltaLayer { - continue; + total_objects_ctr += fetch_response.contents().len() as u64; + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, } - objects.push(LargeObject { - key, - size: obj.size.unwrap() as u64, - kind, - }) - } - object_ctr += fetch_response.contents().len() as u64; - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, } + + Ok((tenant_shard_id, objects, total_objects_ctr)) } + }); + let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); + let mut objects = Vec::new(); + + let mut tenant_ctr = 0u64; + let mut object_ctr = 0u64; + while let Some(res) = objects_stream.next().await { + let (tenant_shard_id, objects_slice, total_objects_ctr) = res?; + objects.extend_from_slice(&objects_slice); + + object_ctr += total_objects_ctr; tenant_ctr += 1; - if tenant_ctr % 50 == 0 { + if tenant_ctr % 100 == 0 { tracing::info!( - "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len() + "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", + objects.len() ); } } + + let bucket_name = target.bucket_name(); + tracing::info!( + "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", + objects.len() + ); Ok(LargeObjectListing { objects }) } diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 10699edd3c94..16a26613d25b 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -78,6 +78,8 @@ enum Command { min_size: u64, #[arg(short, long, default_value_t = false)] ignore_deltas: bool, + #[arg(long = "concurrency", short = 'j', default_value_t = 64)] + concurrency: usize, }, } @@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> { Command::FindLargeObjects { min_size, ignore_deltas, + concurrency, } => { - let summary = - find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas) - .await?; + let summary = find_large_objects::find_large_objects( + bucket_config, + min_size, + ignore_deltas, + concurrency, + ) + .await?; println!("{}", serde_json::to_string(&summary).unwrap()); Ok(()) }