Skip to content

Commit

Permalink
feat(test_runner): allowed_errors in storage scrubber (#10062)
Browse files Browse the repository at this point in the history
## Problem

resolve
#9988 (comment)

## Summary of changes

* New verbose mode for storage scrubber scan metadata (pageserver) that
contains the error messages.
* Filter allowed_error list from the JSON output to determine the
healthy flag status.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Dec 10, 2024
1 parent b853f78 commit aa0554f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 15 deletions.
8 changes: 7 additions & 1 deletion storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ enum Command {
/// For safekeeper node_kind only, json list of timelines and their lsn info
#[arg(long, default_value = None)]
timeline_lsns: Option<String>,
#[arg(long, default_value_t = false)]
verbose: bool,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
Expand Down Expand Up @@ -166,6 +168,7 @@ async fn main() -> anyhow::Result<()> {
dump_db_connstr,
dump_db_table,
timeline_lsns,
verbose,
} => {
if let NodeKind::Safekeeper = node_kind {
let db_or_list = match (timeline_lsns, dump_db_connstr) {
Expand Down Expand Up @@ -203,6 +206,7 @@ async fn main() -> anyhow::Result<()> {
tenant_ids,
json,
post_to_storcon,
verbose,
cli.exit_code,
)
.await
Expand Down Expand Up @@ -313,6 +317,7 @@ pub async fn run_cron_job(
Vec::new(),
true,
post_to_storcon,
false, // default to non-verbose mode
exit_code,
)
.await?;
Expand Down Expand Up @@ -362,12 +367,13 @@ pub async fn scan_pageserver_metadata_cmd(
tenant_shard_ids: Vec<TenantShardId>,
json: bool,
post_to_storcon: bool,
verbose: bool,
exit_code: bool,
) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
Expand Down
39 changes: 29 additions & 10 deletions storage_scrubber/src/scan_pageserver_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
timeline_shard_count: usize,
with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>,
/// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_errors: HashMap<String, Vec<String>>,
/// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_warnings: HashMap<String, Vec<String>>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,

Expand Down Expand Up @@ -52,7 +56,12 @@ impl MetadataSummary {
}
}

fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
fn update_analysis(
&mut self,
id: &TenantShardTimelineId,
analysis: &TimelineAnalysis,
verbose: bool,
) {
if analysis.is_healthy() {
self.healthy_tenant_shards.insert(id.tenant_shard_id);
} else {
Expand All @@ -61,11 +70,17 @@ impl MetadataSummary {
}

if !analysis.errors.is_empty() {
self.with_errors.insert(*id);
let entry = self.with_errors.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.errors.iter().cloned());
}
}

if !analysis.warnings.is_empty() {
self.with_warnings.insert(*id);
let entry = self.with_warnings.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.warnings.iter().cloned());
}
}
}

Expand Down Expand Up @@ -120,6 +135,7 @@ Index versions: {version_summary}
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
verbose: bool,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;

Expand Down Expand Up @@ -164,6 +180,7 @@ pub async fn scan_pageserver_metadata(
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
verbose: bool,
) {
summary.tenant_count += 1;

Expand Down Expand Up @@ -203,7 +220,7 @@ pub async fn scan_pageserver_metadata(
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
summary.update_analysis(&ttid, &analysis, verbose);

timeline_ids.insert(ttid.timeline_id);
} else {
Expand Down Expand Up @@ -271,10 +288,6 @@ pub async fn scan_pageserver_metadata(
summary.update_data(&data);

match tenant_id {
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
Expand All @@ -287,6 +300,7 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
timelines,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await;
Expand All @@ -296,6 +310,10 @@ pub async fn scan_pageserver_metadata(
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}

match &data.blob_data {
Expand Down Expand Up @@ -326,6 +344,7 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
tenant_timeline_results,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await;
Expand Down
58 changes: 55 additions & 3 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4556,6 +4556,7 @@ class StorageScrubber:
def __init__(self, env: NeonEnv, log_dir: Path):
self.env = env
self.log_dir = log_dir
self.allowed_errors: list[str] = []

def scrubber_cli(
self, args: list[str], timeout, extra_env: dict[str, str] | None = None
Expand Down Expand Up @@ -4633,19 +4634,70 @@ def scan_metadata(
if timeline_lsns is not None:
args.append("--timeline-lsns")
args.append(json.dumps(timeline_lsns))
if node_kind == NodeKind.PAGESERVER:
args.append("--verbose")
stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env)

try:
summary = json.loads(stdout)
# summary does not contain "with_warnings" if node_kind is the safekeeper
no_warnings = "with_warnings" not in summary or not summary["with_warnings"]
healthy = not summary["with_errors"] and no_warnings
healthy = self._check_run_healthy(summary)
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)
raise

def _check_line_allowed(self, line: str) -> bool:
for a in self.allowed_errors:
try:
if re.match(a, line):
return True
except re.error:
log.error(f"Invalid regex: '{a}'")
raise
return False

def _check_line_list_allowed(self, lines: list[str]) -> bool:
for line in lines:
if not self._check_line_allowed(line):
return False
return True

def _check_run_healthy(self, summary: dict[str, Any]) -> bool:
# summary does not contain "with_warnings" if node_kind is the safekeeper
healthy = True
with_warnings = summary.get("with_warnings", None)
if with_warnings is not None:
if isinstance(with_warnings, list):
if len(with_warnings) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, warnings in with_warnings.items():
assert (
len(warnings) > 0
), "with_warnings value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(warnings):
healthy = False
break
if not healthy:
return healthy
with_errors = summary.get("with_errors", None)
if with_errors is not None:
if isinstance(with_errors, list):
if len(with_errors) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, errors in with_errors.items():
assert (
len(errors) > 0
), "with_errors value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(errors):
healthy = False
break
return healthy

def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
stdout = self.scrubber_cli(
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],
Expand Down
8 changes: 7 additions & 1 deletion test_runner/regress/test_storage_scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,10 @@ def test_scrubber_scan_pageserver_metadata(
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)

neon_env_builder.disable_scrub_on_exit()
healthy, _ = env.storage_scrubber.scan_metadata()
assert not healthy
env.storage_scrubber.allowed_errors.append(".*not present in remote storage.*")
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy

neon_env_builder.disable_scrub_on_exit() # We already ran scrubber, no need to do an extra run

1 comment on commit aa0554f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7062 tests run: 6745 passed, 2 failed, 315 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_small_inmemory_layers[debug-pg17-False] or test_pageserver_small_inmemory_layers[debug-pg17-True]"
Flaky tests (1)

Postgres 17

Test coverage report is not available

The comment gets automatically updated with the latest test results
aa0554f at 2024-12-10T18:04:51.702Z :recycle:

Please sign in to comment.