diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 007bd3eef083..4bb1bbf3cfd5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2564,9 +2564,9 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option { } /// Given the key of a tenant manifest, parse out the generation number -pub(crate) fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option { +pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option { static RE: OnceLock = OnceLock::new(); - let re = RE.get_or_init(|| Regex::new(r".+tenant-manifest-([0-9a-f]{8}).json").unwrap()); + let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap()); re.captures(path.get_path().as_str()) .and_then(|c| c.get(1)) .and_then(|m| Generation::parse_suffix(m.as_str())) diff --git a/pageserver/src/tenant/remote_timeline_client/manifest.rs b/pageserver/src/tenant/remote_timeline_client/manifest.rs index c4382cb6480f..2029847a1249 100644 --- a/pageserver/src/tenant/remote_timeline_client/manifest.rs +++ b/pageserver/src/tenant/remote_timeline_client/manifest.rs @@ -43,7 +43,7 @@ impl TenantManifest { offloaded_timelines: vec![], } } - pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result { + pub fn from_json_bytes(bytes: &[u8]) -> Result { serde_json::from_slice::(bytes) } diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 8d855d263cfd..1b4ff01a170a 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -4,17 +4,21 @@ use itertools::Itertools; use pageserver::tenant::checks::check_valid_layermap; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; +use pageserver::tenant::remote_timeline_client::manifest::TenantManifest; use pageserver_api::shard::ShardIndex; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use utils::generation::Generation; use utils::id::TimelineId; +use utils::shard::TenantShardId; use crate::cloud_admin_api::BranchData; use crate::metadata_stream::stream_listing; use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; use futures_util::StreamExt; -use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; +use pageserver::tenant::remote_timeline_client::{ + parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path, +}; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath}; @@ -527,3 +531,132 @@ async fn list_timeline_blobs_impl( unknown_keys, })) } + +pub(crate) struct RemoteTenantManifestInfo { + pub(crate) latest_generation: Option, + pub(crate) manifests: Vec<(Generation, ListingObject)>, +} + +pub(crate) enum ListTenantManifestResult { + WithErrors { + errors: Vec<(String, String)>, + #[allow(dead_code)] + unknown_keys: Vec, + }, + NoErrors(RemoteTenantManifestInfo), +} + +/// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object. +pub(crate) async fn list_tenant_manifests( + remote_client: &GenericRemoteStorage, + tenant_id: TenantShardId, + root_target: &RootTarget, +) -> anyhow::Result { + let mut errors = Vec::new(); + let mut unknown_keys = Vec::new(); + + let mut tenant_root_target = root_target.tenant_root(&tenant_id); + let original_prefix = tenant_root_target.prefix_in_bucket.clone(); + const TENANT_MANIFEST_STEM: &str = "tenant-manifest"; + tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM; + tenant_root_target.delimiter = String::new(); + + let mut manifests: Vec<(Generation, ListingObject)> = Vec::new(); + + let prefix_str = &original_prefix + .strip_prefix("/") + .unwrap_or(&original_prefix); + + let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target)); + 'outer: while let Some(obj) = stream.next().await { + let (key, Some(obj)) = obj? else { + panic!("ListingObject not specified"); + }; + + 'err: { + // TODO a let chain would be nicer here. + let Some(name) = key.object_name() else { + break 'err; + }; + if !name.starts_with(TENANT_MANIFEST_STEM) { + break 'err; + } + let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else { + break 'err; + }; + tracing::debug!("tenant manifest {key}"); + manifests.push((generation, obj)); + continue 'outer; + } + tracing::info!("Listed an unknown key: {key}"); + unknown_keys.push(obj); + } + + if manifests.is_empty() { + tracing::debug!("No manifest for timeline."); + + return Ok(ListTenantManifestResult::WithErrors { + errors, + unknown_keys, + }); + } + if !unknown_keys.is_empty() { + errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string())); + + return Ok(ListTenantManifestResult::WithErrors { + errors, + unknown_keys, + }); + } + + // Find the manifest with the highest generation + let (latest_generation, latest_listing_object) = manifests + .iter() + .max_by_key(|i| i.0) + .map(|(g, obj)| (*g, obj.clone())) + .unwrap(); + + let manifest_bytes = + match download_object_with_retries(remote_client, &latest_listing_object.key).await { + Ok(bytes) => bytes, + Err(e) => { + // It is possible that the tenant gets deleted in-between we list the objects + // and we download the manifest file. + errors.push(( + latest_listing_object.key.get_path().as_str().to_owned(), + format!("failed to download tenant-manifest.json: {e}"), + )); + return Ok(ListTenantManifestResult::WithErrors { + errors, + unknown_keys, + }); + } + }; + + match TenantManifest::from_json_bytes(&manifest_bytes) { + Ok(_manifest) => { + return Ok(ListTenantManifestResult::NoErrors( + RemoteTenantManifestInfo { + latest_generation: Some(latest_generation), + manifests, + }, + )); + } + Err(parse_error) => errors.push(( + latest_listing_object.key.get_path().as_str().to_owned(), + format!("tenant-manifest.json body parsing error: {parse_error}"), + )), + } + + if errors.is_empty() { + errors.push(( + (*prefix_str).to_owned(), + "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(), + )); + } + + Ok(ListTenantManifestResult::WithErrors { + errors, + unknown_keys, + }) +} diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 1e69ddbf150c..20cb9c3633ac 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -2,12 +2,16 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; -use crate::checks::{list_timeline_blobs, BlobDataParseResult}; +use crate::checks::{ + list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult, +}; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES}; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; -use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path}; +use pageserver::tenant::remote_timeline_client::{ + parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path, +}; use pageserver::tenant::storage_layer::LayerName; use pageserver::tenant::IndexPart; use pageserver_api::controller_api::TenantDescribeResponse; @@ -25,6 +29,7 @@ use utils::id::{TenantId, TenantTimelineId}; #[derive(Serialize, Default)] pub struct GcSummary { indices_deleted: usize, + tenant_manifests_deleted: usize, remote_storage_errors: usize, controller_api_errors: usize, ancestor_layers_deleted: usize, @@ -34,12 +39,14 @@ impl GcSummary { fn merge(&mut self, other: Self) { let Self { indices_deleted, + tenant_manifests_deleted, remote_storage_errors, ancestor_layers_deleted, controller_api_errors, } = other; self.indices_deleted += indices_deleted; + self.tenant_manifests_deleted += tenant_manifests_deleted; self.remote_storage_errors += remote_storage_errors; self.ancestor_layers_deleted += ancestor_layers_deleted; self.controller_api_errors += controller_api_errors; @@ -352,6 +359,69 @@ async fn maybe_delete_index( } } +async fn maybe_delete_tenant_manifest( + remote_client: &GenericRemoteStorage, + min_age: &Duration, + latest_gen: Generation, + obj: &ListingObject, + mode: GcMode, + summary: &mut GcSummary, +) { + // Validation: we will only delete things that parse cleanly + let basename = obj.key.get_path().file_name().unwrap(); + let Some(candidate_generation) = + parse_remote_tenant_manifest_path(RemotePath::from_string(basename).unwrap()) + else { + // A strange key: we will not delete this because we don't understand it. + tracing::warn!("Bad index key"); + return; + }; + + // Validation: we will only delete manifests more than one generation old, and in fact we + // should never be called with such recent generations. + if candidate_generation >= latest_gen { + tracing::warn!("Deletion candidate is >= latest generation, this is a bug!"); + return; + } else if candidate_generation.next() == latest_gen { + tracing::warn!("Deletion candidate is >= latest generation - 1, this is a bug!"); + return; + } + + if !is_old_enough(min_age, obj, summary) { + return; + } + + if matches!(mode, GcMode::DryRun) { + tracing::info!("Dry run: would delete this key"); + return; + } + + // All validations passed: erase the object + let cancel = CancellationToken::new(); + match backoff::retry( + || remote_client.delete(&obj.key, &cancel), + |_| false, + 3, + MAX_RETRIES as u32, + "maybe_delete_tenant_manifest", + &cancel, + ) + .await + { + None => { + unreachable!("Using a dummy cancellation token"); + } + Some(Ok(_)) => { + tracing::info!("Successfully deleted tenant manifest"); + summary.tenant_manifests_deleted += 1; + } + Some(Err(e)) => { + tracing::warn!("Failed to delete tenant manifest: {e}"); + summary.remote_storage_errors += 1; + } + } +} + #[allow(clippy::too_many_arguments)] async fn gc_ancestor( remote_client: &GenericRemoteStorage, @@ -451,13 +521,100 @@ async fn gc_ancestor( Ok(()) } +async fn gc_tenant_manifests( + remote_client: &GenericRemoteStorage, + min_age: Duration, + target: &RootTarget, + mode: GcMode, + tenant_shard_id: TenantShardId, +) -> anyhow::Result { + let mut gc_summary = GcSummary::default(); + match list_tenant_manifests(remote_client, tenant_shard_id, target).await? { + ListTenantManifestResult::WithErrors { + errors, + unknown_keys: _, + } => { + for (_key, error) in errors { + tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}"); + } + } + ListTenantManifestResult::NoErrors(mut manifest_info) => { + let Some(latest_gen) = manifest_info.latest_generation else { + return Ok(gc_summary); + }; + manifest_info + .manifests + .sort_by_key(|(generation, _obj)| *generation); + // skip the two latest generations (they don't neccessarily have to be 1 apart from each other) + let candidates = manifest_info.manifests.iter().rev().skip(2); + for (_generation, key) in candidates { + maybe_delete_tenant_manifest( + remote_client, + &min_age, + latest_gen, + key, + mode, + &mut gc_summary, + ) + .instrument( + info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_gen, %key.key), + ) + .await; + } + } + } + Ok(gc_summary) +} + +async fn gc_timeline( + remote_client: &GenericRemoteStorage, + min_age: &Duration, + target: &RootTarget, + mode: GcMode, + ttid: TenantShardTimelineId, + accumulator: &Arc>, +) -> anyhow::Result { + let mut summary = GcSummary::default(); + let data = list_timeline_blobs(remote_client, ttid, target).await?; + + let (index_part, latest_gen, candidates) = match &data.blob_data { + BlobDataParseResult::Parsed { + index_part, + index_part_generation, + s3_layers: _s3_layers, + } => (index_part, *index_part_generation, data.unused_index_keys), + BlobDataParseResult::Relic => { + // Post-deletion tenant location: don't try and GC it. + return Ok(summary); + } + BlobDataParseResult::Incorrect { + errors, + s3_layers: _, + } => { + // Our primary purpose isn't to report on bad data, but log this rather than skipping silently + tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}"); + return Ok(summary); + } + }; + + accumulator.lock().unwrap().update(ttid, index_part); + + for key in candidates { + maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary) + .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key)) + .await; + } + + Ok(summary) +} + /// Physical garbage collection: removing unused S3 objects. /// /// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level /// (keys, layers). This type of garbage collection is about removing: /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between /// uploading a layer and uploading an index) -/// - Index objects from historic generations +/// - Index objects and tenant manifests from historic generations /// /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and /// make sure that object listings don't get slowed down by large numbers of garbage objects. @@ -470,6 +627,7 @@ pub async fn pageserver_physical_gc( ) -> anyhow::Result { let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; + let remote_client = Arc::new(remote_client); let tenants = if tenant_shard_ids.is_empty() { futures::future::Either::Left(stream_tenants(&remote_client, &target)) } else { @@ -484,59 +642,59 @@ pub async fn pageserver_physical_gc( let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); // Generate a stream of TenantTimelineId - let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t)); - let timelines = timelines.try_buffered(CONCURRENCY); - let timelines = timelines.try_flatten(); - - // Generate a stream of S3TimelineBlobData - async fn gc_timeline( - remote_client: &GenericRemoteStorage, - min_age: &Duration, - target: &RootTarget, - mode: GcMode, - ttid: TenantShardTimelineId, - accumulator: &Arc>, - ) -> anyhow::Result { - let mut summary = GcSummary::default(); - let data = list_timeline_blobs(remote_client, ttid, target).await?; - - let (index_part, latest_gen, candidates) = match &data.blob_data { - BlobDataParseResult::Parsed { - index_part, - index_part_generation, - s3_layers: _s3_layers, - } => (index_part, *index_part_generation, data.unused_index_keys), - BlobDataParseResult::Relic => { - // Post-deletion tenant location: don't try and GC it. - return Ok(summary); - } - BlobDataParseResult::Incorrect { - errors, - s3_layers: _, - } => { - // Our primary purpose isn't to report on bad data, but log this rather than skipping silently - tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}"); - return Ok(summary); - } - }; - - accumulator.lock().unwrap().update(ttid, index_part); - - for key in candidates { - maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary) - .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key)) - .await; - } - - Ok(summary) + enum GcSummaryOrContent { + Content(T), + GcSummary(GcSummary), } + let timelines = tenants.map_ok(|tenant_shard_id| { + let target_ref = ⌖ + let remote_client_ref = &remote_client; + async move { + let summaries_from_manifests = match gc_tenant_manifests( + remote_client_ref, + min_age, + target_ref, + mode, + tenant_shard_id, + ) + .await + { + Ok(gc_summary) => vec![Ok(GcSummaryOrContent::::GcSummary( + gc_summary, + ))], + Err(e) => { + tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}"); + Vec::new() + } + }; + stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id) + .await + .map(|stream| { + stream + .map_ok(GcSummaryOrContent::Content) + .chain(futures::stream::iter(summaries_from_manifests.into_iter())) + }) + } + }); + let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); + let timelines = timelines.try_flatten(); let mut summary = GcSummary::default(); // Drain futures for per-shard GC, populating accumulator as a side effect { - let timelines = timelines.map_ok(|ttid| { - gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator) + let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid { + GcSummaryOrContent::Content(ttid) => futures::future::Either::Left(gc_timeline( + &remote_client, + &min_age, + &target, + mode, + ttid, + &accumulator, + )), + GcSummaryOrContent::GcSummary(gc_summary) => { + futures::future::Either::Right(futures::future::ok(gc_summary)) + } }); let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 5a1e493bbec7..e808dd13966c 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -835,3 +835,117 @@ def test_timeline_retain_lsn( with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint: sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") assert sum == pre_branch_sum + + +def test_timeline_offload_generations(neon_env_builder: NeonEnvBuilder): + """ + Test for scrubber deleting old generations of manifests + """ + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + + # Turn off gc and compaction loops: we want to issue them manually for better reliability + tenant_id, root_timeline_id = env.create_tenant( + conf={ + "gc_period": "0s", + "compaction_period": "0s", + "checkpoint_distance": f"{1024 ** 2}", + } + ) + + # Create a branch and archive it + child_timeline_id = env.create_branch("test_archived_branch_persisted", tenant_id) + + with env.endpoints.create_start( + "test_archived_branch_persisted", tenant_id=tenant_id + ) as endpoint: + endpoint.safe_psql_many( + [ + "CREATE TABLE foo(key serial primary key, t text default 'data_content')", + "INSERT INTO foo SELECT FROM generate_series(1,512)", + ] + ) + sum = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2") + last_flush_lsn_upload(env, endpoint, tenant_id, child_timeline_id) + + assert_prefix_not_empty( + neon_env_builder.pageserver_remote_storage, + prefix=f"tenants/{str(tenant_id)}/", + ) + assert_prefix_empty( + neon_env_builder.pageserver_remote_storage, + prefix=f"tenants/{str(tenant_id)}/tenant-manifest", + ) + + ps_http.timeline_archival_config( + tenant_id, + child_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + + def timeline_offloaded_api(timeline_id: TimelineId) -> bool: + # TODO add a proper API to check if a timeline has been offloaded or not + return not any( + timeline["timeline_id"] == str(timeline_id) + for timeline in ps_http.timeline_list(tenant_id=tenant_id) + ) + + def child_offloaded(): + ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id) + assert timeline_offloaded_api(child_timeline_id) + + wait_until(child_offloaded) + + assert timeline_offloaded_api(child_timeline_id) + assert not timeline_offloaded_api(root_timeline_id) + + # Reboot the pageserver a bunch of times, do unoffloads, offloads + for i in range(5): + env.pageserver.stop() + env.pageserver.start() + + assert timeline_offloaded_api(child_timeline_id) + assert not timeline_offloaded_api(root_timeline_id) + + ps_http.timeline_archival_config( + tenant_id, + child_timeline_id, + state=TimelineArchivalState.UNARCHIVED, + ) + + assert not timeline_offloaded_api(child_timeline_id) + + if i % 2 == 0: + with env.endpoints.create_start( + "test_archived_branch_persisted", tenant_id=tenant_id + ) as endpoint: + sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2") + assert sum == sum_again + + ps_http.timeline_archival_config( + tenant_id, + child_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + wait_until(child_offloaded) + + # + # Now ensure that scrubber runs will clean up old generations' manifests. + # + + # Sleep some amount larger than min_age_secs + time.sleep(3) + + # Ensure that min_age_secs has a deletion impeding effect + gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600, mode="full") + assert gc_summary["remote_storage_errors"] == 0 + assert gc_summary["indices_deleted"] == 0 + assert gc_summary["tenant_manifests_deleted"] == 0 + + gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full") + assert gc_summary["remote_storage_errors"] == 0 + assert gc_summary["indices_deleted"] > 0 + assert gc_summary["tenant_manifests_deleted"] > 0