From a6975f0ce0d5c90e5c24bfb947abb6d6deaac936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 19:28:23 +0200 Subject: [PATCH 1/5] Add concurrency to the find-large-objects scrubber subcommand --- storage_scrubber/src/find_large_objects.rs | 113 ++++++++++++++------- storage_scrubber/src/main.rs | 13 ++- 2 files changed, 85 insertions(+), 41 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 24668b65169a..d9bf83a7cee9 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -1,4 +1,9 @@ -use futures::StreamExt; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::storage_layer::LayerName; use serde::{Deserialize, Serialize}; @@ -29,7 +34,7 @@ impl LargeObjectKind { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct LargeObject { pub key: String, pub size: u64, @@ -45,53 +50,85 @@ pub async fn find_large_objects( bucket_config: BucketConfig, min_size: u64, ignore_deltas: bool, + concurrency: usize, ) -> anyhow::Result { let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; - let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); - let mut objects = Vec::new(); - let mut tenant_ctr = 0u64; - let mut object_ctr = 0u64; - while let Some(tenant_shard_id) = tenants.next().await { - let tenant_shard_id = tenant_shard_id?; + let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); + + let tenant_ctr = Arc::new(AtomicU64::new(0)); + let object_ctr = Arc::new(AtomicU64::new(0)); + + let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - // We want the objects and not just common prefixes - tenant_root.delimiter.clear(); - let mut continuation_token = None; - loop { - let fetch_response = - list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) - .await?; - for obj in fetch_response.contents().iter().filter(|o| { - if let Some(obj_size) = o.size { - min_size as i64 <= obj_size - } else { - false + let tenant_ctr = tenant_ctr.clone(); + let object_ctr = object_ctr.clone(); + let s3_client = s3_client.clone(); + async move { + let mut objects = Vec::new(); + // We want the objects and not just common prefixes + tenant_root.delimiter.clear(); + let mut continuation_token = None; + loop { + let fetch_response = + list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) + .await + .expect("couldn't list objects"); + for obj in fetch_response.contents().iter().filter(|o| { + if let Some(obj_size) = o.size { + min_size as i64 <= obj_size + } else { + false + } + }) { + let key = obj.key().expect("couldn't get key").to_owned(); + let kind = LargeObjectKind::from_key(&key); + if ignore_deltas && kind == LargeObjectKind::DeltaLayer { + continue; + } + objects.push(LargeObject { + key, + size: obj.size.unwrap() as u64, + kind, + }) } - }) { - let key = obj.key().expect("couldn't get key").to_owned(); - let kind = LargeObjectKind::from_key(&key); - if ignore_deltas && kind == LargeObjectKind::DeltaLayer { - continue; + object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed); + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, } - objects.push(LargeObject { - key, - size: obj.size.unwrap() as u64, - kind, - }) - } - object_ctr += fetch_response.contents().len() as u64; - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, } + + tenant_ctr.fetch_add(1, Ordering::Relaxed); + + Ok((tenant_shard_id, objects)) } + }); + let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); + + //let mut objects_stream = objects_stream.flatten(); + + let mut objects = Vec::new(); + while let Some(res) = objects_stream.next().await { + let (tenant_shard_id, objects_slice) = res?; + objects.extend_from_slice(&objects_slice); + + let tenants_count = tenant_ctr.load(Ordering::Relaxed); + + if tenants_count % 100 == 0 { + let objects_count = object_ctr.load(Ordering::Relaxed); - tenant_ctr += 1; - if tenant_ctr % 50 == 0 { tracing::info!( - "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len() + "Scanned {tenants_count} shards. objects={objects_count}, found={}, current={tenant_shard_id}.", + objects.len() ); } } + + let tenants_count = tenant_ctr.load(Ordering::Relaxed); + let objects_count = object_ctr.load(Ordering::Relaxed); + tracing::info!( + "Scan finished. Scanned {tenants_count} shards. objects={objects_count}, found={}.", + objects.len() + ); Ok(LargeObjectListing { objects }) } diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 10699edd3c94..16a26613d25b 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -78,6 +78,8 @@ enum Command { min_size: u64, #[arg(short, long, default_value_t = false)] ignore_deltas: bool, + #[arg(long = "concurrency", short = 'j', default_value_t = 64)] + concurrency: usize, }, } @@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> { Command::FindLargeObjects { min_size, ignore_deltas, + concurrency, } => { - let summary = - find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas) - .await?; + let summary = find_large_objects::find_large_objects( + bucket_config, + min_size, + ignore_deltas, + concurrency, + ) + .await?; println!("{}", serde_json::to_string(&summary).unwrap()); Ok(()) } From 8eca2b2a6d7a658b907ac591a5531c61e7ba5073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 21:12:52 +0200 Subject: [PATCH 2/5] Address Chi's review comment --- storage_scrubber/src/find_large_objects.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index d9bf83a7cee9..69ee7a3d99ca 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -105,8 +105,6 @@ pub async fn find_large_objects( }); let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); - //let mut objects_stream = objects_stream.flatten(); - let mut objects = Vec::new(); while let Some(res) = objects_stream.next().await { let (tenant_shard_id, objects_slice) = res?; From aa48651b28f3a5fb930d6f825c64a477575e6b98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 21:15:10 +0200 Subject: [PATCH 3/5] Add back tenant_ctr The atomic wasn't required after all --- storage_scrubber/src/find_large_objects.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 69ee7a3d99ca..573950e422a9 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -55,12 +55,10 @@ pub async fn find_large_objects( let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); - let tenant_ctr = Arc::new(AtomicU64::new(0)); let object_ctr = Arc::new(AtomicU64::new(0)); let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - let tenant_ctr = tenant_ctr.clone(); let object_ctr = object_ctr.clone(); let s3_client = s3_client.clone(); async move { @@ -98,34 +96,32 @@ pub async fn find_large_objects( } } - tenant_ctr.fetch_add(1, Ordering::Relaxed); - Ok((tenant_shard_id, objects)) } }); let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); let mut objects = Vec::new(); + + let tenant_ctr = 0u64; while let Some(res) = objects_stream.next().await { let (tenant_shard_id, objects_slice) = res?; objects.extend_from_slice(&objects_slice); - let tenants_count = tenant_ctr.load(Ordering::Relaxed); - - if tenants_count % 100 == 0 { + if tenant_ctr % 100 == 0 { let objects_count = object_ctr.load(Ordering::Relaxed); tracing::info!( - "Scanned {tenants_count} shards. objects={objects_count}, found={}, current={tenant_shard_id}.", + "Scanned {tenant_ctr} shards. objects={objects_count}, found={}, current={tenant_shard_id}.", objects.len() ); } } - let tenants_count = tenant_ctr.load(Ordering::Relaxed); + let bucket_name = target.bucket_name(); let objects_count = object_ctr.load(Ordering::Relaxed); tracing::info!( - "Scan finished. Scanned {tenants_count} shards. objects={objects_count}, found={}.", + "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={objects_count}, found={}.", objects.len() ); Ok(LargeObjectListing { objects }) From beeb8b7319448cb6be8ad680620345085784fb90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 21:30:53 +0200 Subject: [PATCH 4/5] Remove expect --- storage_scrubber/src/find_large_objects.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 573950e422a9..9489118de041 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -69,8 +69,7 @@ pub async fn find_large_objects( loop { let fetch_response = list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) - .await - .expect("couldn't list objects"); + .await?; for obj in fetch_response.contents().iter().filter(|o| { if let Some(obj_size) = o.size { min_size as i64 <= obj_size From 9c5e2f3e2b4e716a33f18a23ec2939b7f39ecb87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 21:33:37 +0200 Subject: [PATCH 5/5] Remove atomics completely they are not needed --- storage_scrubber/src/find_large_objects.rs | 27 ++++++++-------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 9489118de041..1422545f2fce 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -1,8 +1,3 @@ -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::storage_layer::LayerName; use serde::{Deserialize, Serialize}; @@ -55,14 +50,12 @@ pub async fn find_large_objects( let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); - let object_ctr = Arc::new(AtomicU64::new(0)); - let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - let object_ctr = object_ctr.clone(); let s3_client = s3_client.clone(); async move { let mut objects = Vec::new(); + let mut total_objects_ctr = 0u64; // We want the objects and not just common prefixes tenant_root.delimiter.clear(); let mut continuation_token = None; @@ -88,39 +81,39 @@ pub async fn find_large_objects( kind, }) } - object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed); + total_objects_ctr += fetch_response.contents().len() as u64; match fetch_response.next_continuation_token { Some(new_token) => continuation_token = Some(new_token), None => break, } } - Ok((tenant_shard_id, objects)) + Ok((tenant_shard_id, objects, total_objects_ctr)) } }); let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); let mut objects = Vec::new(); - let tenant_ctr = 0u64; + let mut tenant_ctr = 0u64; + let mut object_ctr = 0u64; while let Some(res) = objects_stream.next().await { - let (tenant_shard_id, objects_slice) = res?; + let (tenant_shard_id, objects_slice, total_objects_ctr) = res?; objects.extend_from_slice(&objects_slice); + object_ctr += total_objects_ctr; + tenant_ctr += 1; if tenant_ctr % 100 == 0 { - let objects_count = object_ctr.load(Ordering::Relaxed); - tracing::info!( - "Scanned {tenant_ctr} shards. objects={objects_count}, found={}, current={tenant_shard_id}.", + "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len() ); } } let bucket_name = target.bucket_name(); - let objects_count = object_ctr.load(Ordering::Relaxed); tracing::info!( - "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={objects_count}, found={}.", + "Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.", objects.len() ); Ok(LargeObjectListing { objects })