Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add find-large-objects subcommand to scrubber #8257

Merged
merged 10 commits into from
Jul 4, 2024
97 changes: 97 additions & 0 deletions storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use futures::StreamExt;
use pageserver::tenant::storage_layer::{DeltaLayerName, ImageLayerName};
use serde::{Deserialize, Serialize};

use crate::{
init_remote, list_objects_with_retries, metadata_stream::stream_tenants, BucketConfig, NodeKind,
};

#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
enum LargeObjectKind {
DeltaLayer,
ImageLayer,
Other,
}

impl LargeObjectKind {
fn from_key(key: &str) -> Self {
let fname = key.split('/').last().unwrap();
let is_delta = DeltaLayerName::parse_str(fname).is_some();
let is_image = ImageLayerName::parse_str(fname).is_some();
match (is_delta, is_image) {
(true, false) => LargeObjectKind::DeltaLayer,
(false, true) => LargeObjectKind::ImageLayer,
(false, false) | (true, true) => LargeObjectKind::Other,
}
}
}

#[derive(Serialize, Deserialize)]
pub struct LargeObject {
pub key: String,
pub size: u64,
kind: LargeObjectKind,
}

#[derive(Serialize, Deserialize)]
pub struct LargeObjectListing {
pub objects: Vec<LargeObject>,
}

pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
problame marked this conversation as resolved.
Show resolved Hide resolved
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
problame marked this conversation as resolved.
Show resolved Hide resolved
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
.key()
.map(|k| k.to_owned())
.unwrap_or_else(|| "<unknown key>".to_owned());
let kind = LargeObjectKind::from_key(&key);
problame marked this conversation as resolved.
Show resolved Hide resolved
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}

tenant_ctr += 1;
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
);
}
}
Ok(LargeObjectListing { objects })
}
1 change: 1 addition & 0 deletions storage_scrubber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checks;
pub mod cloud_admin_api;
pub mod find_large_objects;
pub mod garbage;
pub mod metadata_stream;
pub mod pageserver_physical_gc;
Expand Down
13 changes: 13 additions & 0 deletions storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::bail;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use storage_scrubber::find_large_objects;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
Expand Down Expand Up @@ -72,6 +73,12 @@ enum Command {
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
mode: GcMode,
},
FindLargeObjects {
#[arg(long = "min-size")]
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
},
}

#[tokio::main]
Expand All @@ -86,6 +93,7 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
Command::FindLargeObjects { .. } => "find-large-objects",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
Expand Down Expand Up @@ -199,5 +207,10 @@ async fn main() -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::FindLargeObjects { min_size, ignore_deltas } => {
let summary = find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas).await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
}
}
Loading