Skip to content

Commit

Permalink
Add find-large-objects subcommand to scrubber (#8257)
Browse files Browse the repository at this point in the history
Adds a find-large-objects subcommand to the scrubber to allow listing
layer objects larger than a specific size.

To be used like:

```
AWS_PROFILE=dev REGION=us-east-2 BUCKET=neon-dev-storage-us-east-2 cargo run -p storage_scrubber -- find-large-objects --min-size 250000000 --ignore-deltas
```

Part of #5431
  • Loading branch information
arpad-m authored and VladLazar committed Jul 8, 2024
1 parent 5ca996c commit 112aef1
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 1 deletion.
2 changes: 1 addition & 1 deletion storage_scrubber/src/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub(crate) enum BlobDataParseResult {
Incorrect(Vec<String>),
}

fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
match name.rsplit_once('-') {
// FIXME: this is gross, just use a regex?
Some((layer_filename, gen)) if gen.len() == 8 => {
Expand Down
97 changes: 97 additions & 0 deletions storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use futures::StreamExt;
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};

use crate::{
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
metadata_stream::stream_tenants, BucketConfig, NodeKind,
};

#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
enum LargeObjectKind {
DeltaLayer,
ImageLayer,
Other,
}

impl LargeObjectKind {
fn from_key(key: &str) -> Self {
let fname = key.split('/').last().unwrap();

let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
return LargeObjectKind::Other;
};

match layer_name {
LayerName::Image(_) => LargeObjectKind::ImageLayer,
LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
}
}
}

#[derive(Serialize, Deserialize)]
pub struct LargeObject {
pub key: String,
pub size: u64,
kind: LargeObjectKind,
}

#[derive(Serialize, Deserialize)]
pub struct LargeObjectListing {
pub objects: Vec<LargeObject>,
}

pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}

tenant_ctr += 1;
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
);
}
}
Ok(LargeObjectListing { objects })
}
1 change: 1 addition & 0 deletions storage_scrubber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checks;
pub mod cloud_admin_api;
pub mod find_large_objects;
pub mod garbage;
pub mod metadata_stream;
pub mod pageserver_physical_gc;
Expand Down
18 changes: 18 additions & 0 deletions storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::bail;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use storage_scrubber::find_large_objects;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
Expand Down Expand Up @@ -72,6 +73,12 @@ enum Command {
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
mode: GcMode,
},
FindLargeObjects {
#[arg(long = "min-size")]
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
},
}

#[tokio::main]
Expand All @@ -86,6 +93,7 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
Command::FindLargeObjects { .. } => "find-large-objects",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
Expand Down Expand Up @@ -199,5 +207,15 @@ async fn main() -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::FindLargeObjects {
min_size,
ignore_deltas,
} => {
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
}
}

0 comments on commit 112aef1

Please sign in to comment.