Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency to the find-large-objects scrubber subcommand #8291

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 75 additions & 38 deletions storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use futures::StreamExt;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -29,7 +34,7 @@ impl LargeObjectKind {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct LargeObject {
pub key: String,
pub size: u64,
Expand All @@ -45,53 +50,85 @@ pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));

let tenant_ctr = Arc::new(AtomicU64::new(0));
let object_ctr = Arc::new(AtomicU64::new(0));

let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
let tenant_ctr = tenant_ctr.clone();
let object_ctr = object_ctr.clone();
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await
.expect("couldn't list objects");
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}

tenant_ctr.fetch_add(1, Ordering::Relaxed);

Ok((tenant_shard_id, objects))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));

//let mut objects_stream = objects_stream.flatten();

arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let mut objects = Vec::new();
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice) = res?;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
objects.extend_from_slice(&objects_slice);

let tenants_count = tenant_ctr.load(Ordering::Relaxed);

if tenants_count % 100 == 0 {
let objects_count = object_ctr.load(Ordering::Relaxed);

tenant_ctr += 1;
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
"Scanned {tenants_count} shards. objects={objects_count}, found={}, current={tenant_shard_id}.",
objects.len()
);
}
}

let tenants_count = tenant_ctr.load(Ordering::Relaxed);
let objects_count = object_ctr.load(Ordering::Relaxed);
tracing::info!(
"Scan finished. Scanned {tenants_count} shards. objects={objects_count}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}
13 changes: 10 additions & 3 deletions storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ enum Command {
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}

Expand Down Expand Up @@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> {
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Expand Down
Loading