diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c6fc3bfe6c8c..909f99ea9d07 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -39,6 +39,7 @@ use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; use std::fmt; use std::future::Future; +use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; @@ -524,6 +525,9 @@ pub struct OffloadedTimeline { /// Prevent two tasks from deleting the timeline at the same time. If held, the /// timeline is being deleted. If 'true', the timeline has already been deleted. pub delete_progress: TimelineDeleteProgress, + + /// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it + pub deleted_from_ancestor: AtomicBool, } impl OffloadedTimeline { @@ -533,9 +537,16 @@ impl OffloadedTimeline { /// the timeline is not in a stopped state. /// Panics if the timeline is not archived. fn from_timeline(timeline: &Timeline) -> Result { - let ancestor_retain_lsn = timeline - .get_ancestor_timeline_id() - .map(|_timeline_id| timeline.get_ancestor_lsn()); + let (ancestor_retain_lsn, ancestor_timeline_id) = + if let Some(ancestor_timeline) = timeline.ancestor_timeline() { + let ancestor_lsn = timeline.get_ancestor_lsn(); + let ancestor_timeline_id = ancestor_timeline.timeline_id; + let mut gc_info = ancestor_timeline.gc_info.write().unwrap(); + gc_info.insert_child(timeline.timeline_id, ancestor_lsn, MaybeOffloaded::Yes); + (Some(ancestor_lsn), Some(ancestor_timeline_id)) + } else { + (None, None) + }; let archived_at = timeline .remote_client .archived_at_stopped_queue()? @@ -543,14 +554,17 @@ impl OffloadedTimeline { Ok(Self { tenant_shard_id: timeline.tenant_shard_id, timeline_id: timeline.timeline_id, - ancestor_timeline_id: timeline.get_ancestor_timeline_id(), + ancestor_timeline_id, ancestor_retain_lsn, archived_at, delete_progress: timeline.delete_progress.clone(), + deleted_from_ancestor: AtomicBool::new(false), }) } fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self { + // We expect to reach this case in tenant loading, where the `retain_lsn` is populated in the parent's `gc_info` + // by the `initialize_gc_info` function. let OffloadedTimelineManifest { timeline_id, ancestor_timeline_id, @@ -564,6 +578,7 @@ impl OffloadedTimeline { ancestor_retain_lsn, archived_at, delete_progress: TimelineDeleteProgress::default(), + deleted_from_ancestor: AtomicBool::new(false), } } fn manifest(&self) -> OffloadedTimelineManifest { @@ -581,6 +596,33 @@ impl OffloadedTimeline { archived_at: *archived_at, } } + /// Delete this timeline's retain_lsn from its ancestor, if present in the given tenant + fn delete_from_ancestor_with_timelines( + &self, + timelines: &std::sync::MutexGuard<'_, HashMap>>, + ) { + if let (Some(_retain_lsn), Some(ancestor_timeline_id)) = + (self.ancestor_retain_lsn, self.ancestor_timeline_id) + { + if let Some((_, ancestor_timeline)) = timelines + .iter() + .find(|(tid, _tl)| **tid == ancestor_timeline_id) + { + ancestor_timeline + .gc_info + .write() + .unwrap() + .remove_child_offloaded(self.timeline_id); + } + } + self.deleted_from_ancestor.store(true, Ordering::Release); + } + /// Call [`Self::delete_from_ancestor_with_timelines`] instead if possible. + /// + /// As the entire tenant is being dropped, don't bother deregistering the `retain_lsn` from the ancestor. + fn defuse_for_tenant_drop(&self) { + self.deleted_from_ancestor.store(true, Ordering::Release); + } } impl fmt::Debug for OffloadedTimeline { @@ -589,6 +631,17 @@ impl fmt::Debug for OffloadedTimeline { } } +impl Drop for OffloadedTimeline { + fn drop(&mut self) { + if !self.deleted_from_ancestor.load(Ordering::Acquire) { + tracing::warn!( + "offloaded timeline {} was dropped without having cleaned it up at the ancestor", + self.timeline_id + ); + } + } +} + #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub enum MaybeOffloaded { Yes, @@ -1531,7 +1584,7 @@ impl Tenant { } // Complete deletions for offloaded timeline id's. offloaded_timelines_list - .retain(|(offloaded_id, _offloaded)| { + .retain(|(offloaded_id, offloaded)| { // At this point, offloaded_timeline_ids has the list of all offloaded timelines // without a prefix in S3, so they are inexistent. // In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage. @@ -1539,6 +1592,7 @@ impl Tenant { let delete = offloaded_timeline_ids.contains(offloaded_id); if delete { tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found"); + offloaded.defuse_for_tenant_drop(); } !delete }); @@ -1927,9 +1981,15 @@ impl Tenant { ))); }; let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap(); - if offloaded_timelines.remove(&timeline_id).is_none() { - warn!("timeline already removed from offloaded timelines"); + match offloaded_timelines.remove(&timeline_id) { + Some(offloaded) => { + offloaded.delete_from_ancestor_with_timelines(&timelines); + } + None => warn!("timeline already removed from offloaded timelines"), } + + self.initialize_gc_info(&timelines, &offloaded_timelines, Some(timeline_id)); + Arc::clone(timeline) }; @@ -2667,7 +2727,7 @@ impl Tenant { .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); // Before activation, populate each Timeline's GcInfo with information about its children - self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor); + self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. @@ -2782,8 +2842,14 @@ impl Tenant { let timeline_id = timeline.timeline_id; let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode); js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await }); - }) - }; + }); + } + { + let timelines_offloaded = self.timelines_offloaded.lock().unwrap(); + timelines_offloaded.values().for_each(|timeline| { + timeline.defuse_for_tenant_drop(); + }); + } // test_long_timeline_create_then_tenant_delete is leaning on this message tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { @@ -3767,10 +3833,13 @@ impl Tenant { &self, timelines: &std::sync::MutexGuard>>, timelines_offloaded: &std::sync::MutexGuard>>, + restrict_to_timeline: Option, ) { - // This function must be called before activation: after activation timeline create/delete operations - // might happen, and this function is not safe to run concurrently with those. - assert!(!self.is_active()); + if restrict_to_timeline.is_none() { + // This function must be called before activation: after activation timeline create/delete operations + // might happen, and this function is not safe to run concurrently with those. + assert!(!self.is_active()); + } // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. @@ -3803,7 +3872,12 @@ impl Tenant { let horizon = self.get_gc_horizon(); // Populate each timeline's GcInfo with information about its child branches - for timeline in timelines.values() { + let timelines_to_write = if let Some(timeline_id) = restrict_to_timeline { + itertools::Either::Left(timelines.get(&timeline_id).into_iter()) + } else { + itertools::Either::Right(timelines.values()) + }; + for timeline in timelines_to_write { let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints .remove(&timeline.timeline_id) .unwrap_or_default(); @@ -9650,4 +9724,54 @@ mod tests { Ok(()) } + + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_timeline_offload_retain_lsn") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + let tline_parent = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + let tline_child = tenant + .branch_timeline_test(&tline_parent, NEW_TIMELINE_ID, Some(Lsn(0x20)), &ctx) + .await + .unwrap(); + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), tline_child.timeline_id, MaybeOffloaded::No)] + ); + } + // We have to directly call the remote_client instead of using the archive function to avoid constructing broker client... + tline_child + .remote_client + .schedule_index_upload_for_timeline_archival_state(TimelineArchivalState::Archived) + .unwrap(); + tline_child.remote_client.wait_completion().await.unwrap(); + offload_timeline(&tenant, &tline_child) + .instrument(tracing::info_span!(parent: None, "offload_test", tenant_id=%"test", shard_id=%"test", timeline_id=%"test")) + .await.unwrap(); + let child_timeline_id = tline_child.timeline_id; + Arc::try_unwrap(tline_child).unwrap(); + + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), child_timeline_id, MaybeOffloaded::Yes)] + ); + } + + tenant + .get_offloaded_timeline(child_timeline_id) + .unwrap() + .defuse_for_tenant_drop(); + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 09ddb1976535..2bc14ec3172c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -477,8 +477,21 @@ impl GcInfo { self.retain_lsns.sort_by_key(|i| i.0); } - pub(super) fn remove_child(&mut self, child_id: TimelineId) { - self.retain_lsns.retain(|i| i.1 != child_id); + pub(super) fn remove_child_maybe_offloaded( + &mut self, + child_id: TimelineId, + maybe_offloaded: MaybeOffloaded, + ) { + self.retain_lsns + .retain(|i| !(i.1 == child_id && i.2 == maybe_offloaded)); + } + + pub(super) fn remove_child_not_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::No); + } + + pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes); } } @@ -4501,7 +4514,7 @@ impl Drop for Timeline { // This lock should never be poisoned, but in case it is we do a .map() instead of // an unwrap(), to avoid panicking in a destructor and thereby aborting the process. if let Ok(mut gc_info) = ancestor.gc_info.write() { - gc_info.remove_child(self.timeline_id) + gc_info.remove_child_not_offloaded(self.timeline_id) } } } @@ -5030,7 +5043,7 @@ impl Timeline { // 1. Is it newer than GC horizon cutoff point? if l.get_lsn_range().end > space_cutoff { - debug!( + info!( "keeping {} because it's newer than space_cutoff {}", l.layer_name(), space_cutoff, @@ -5041,7 +5054,7 @@ impl Timeline { // 2. It is newer than PiTR cutoff point? if l.get_lsn_range().end > time_cutoff { - debug!( + info!( "keeping {} because it's newer than time_cutoff {}", l.layer_name(), time_cutoff, @@ -5060,7 +5073,7 @@ impl Timeline { for retain_lsn in &retain_lsns { // start_lsn is inclusive if &l.get_lsn_range().start <= retain_lsn { - debug!( + info!( "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", l.layer_name(), retain_lsn, @@ -5075,7 +5088,7 @@ impl Timeline { if let Some(lsn) = &max_lsn_with_valid_lease { // keep if layer start <= any of the lease if &l.get_lsn_range().start <= lsn { - debug!( + info!( "keeping {} because there is a valid lease preventing GC at {}", l.layer_name(), lsn, @@ -5107,13 +5120,13 @@ impl Timeline { if !layers .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) { - debug!("keeping {} because it is the latest layer", l.layer_name()); + info!("keeping {} because it is the latest layer", l.layer_name()); result.layers_not_updated += 1; continue 'outer; } // We didn't find any reason to keep this file, so remove it. - debug!( + info!( "garbage collecting {} is_dropped: xx is_incremental: {}", l.layer_name(), l.is_incremental(), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 69001a6c4022..13a8dfa51a2e 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -141,9 +141,10 @@ async fn remove_maybe_offloaded_timeline_from_tenant( ); } TimelineOrOffloaded::Offloaded(timeline) => { - timelines_offloaded + let offloaded_timeline = timelines_offloaded .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map"); + offloaded_timeline.delete_from_ancestor_with_timelines(&timelines); } } diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs index 139484346747..3595d743bc2e 100644 --- a/pageserver/src/tenant/timeline/offload.rs +++ b/pageserver/src/tenant/timeline/offload.rs @@ -66,7 +66,7 @@ pub(crate) async fn offload_timeline( let conf = &tenant.conf; delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await; - remove_timeline_from_tenant(tenant, &timeline, &guard); + let remaining_refcount = remove_timeline_from_tenant(tenant, &timeline, &guard); { let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap(); @@ -87,16 +87,20 @@ pub(crate) async fn offload_timeline( // not our actual state of offloaded timelines. tenant.store_tenant_manifest().await?; + tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})"); + Ok(()) } /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] +/// +/// Returns the strong count of the timeline `Arc` fn remove_timeline_from_tenant( tenant: &Tenant, timeline: &Timeline, _: &DeletionGuard, // using it as a witness -) { +) -> usize { // Remove the timeline from the map. let mut timelines = tenant.timelines.lock().unwrap(); let children_exist = timelines @@ -109,7 +113,9 @@ fn remove_timeline_from_tenant( panic!("Timeline grew children while we removed layer files"); } - timelines + let timeline = timelines .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines' map"); + + Arc::strong_count(&timeline) } diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 83631405abf4..ba4e79c3430d 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -15,13 +15,19 @@ last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException -from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix +from fixtures.pageserver.utils import ( + assert_prefix_empty, + assert_prefix_not_empty, + list_prefix, + wait_until_tenant_active, +) from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage from fixtures.utils import run_only_on_default_postgres, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) +from psycopg2.errors import IoError, UndefinedTable @pytest.mark.parametrize("shard_count", [0, 4]) @@ -641,8 +647,21 @@ def worker(): assert violations == [] -@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None]) -def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]): +@pytest.mark.parametrize("with_intermediary", [False, True]) +@pytest.mark.parametrize( + "offload_child", + [ + "offload", + "offload-corrupt", + "offload-no-restart", + "offload-parent", + "archive", + None, + ], +) +def test_timeline_retain_lsn( + neon_env_builder: NeonEnvBuilder, with_intermediary: bool, offload_child: Optional[str] +): """ Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones """ @@ -650,6 +669,7 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Our corruption code only works with S3 compatible storage neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.rust_log_override = "info,[gc_timeline]=debug" env = neon_env_builder.init_start() ps_http = env.pageserver.http_client() @@ -657,22 +677,30 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op tenant_id, root_timeline_id = env.create_tenant( conf={ # small checkpointing and compaction targets to ensure we generate many upload operations - "checkpoint_distance": 128 * 1024, + "checkpoint_distance": 32 * 1024, "compaction_threshold": 1, - "compaction_target_size": 128 * 1024, + "compaction_target_size": 32 * 1024, # set small image creation thresholds so that gc deletes data - "image_creation_threshold": 2, + "image_creation_threshold": 1, # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", "compaction_period": "0s", # Disable pitr, we only want the latest lsn "pitr_interval": "0s", + "gc_horizon": 0, # Don't rely on endpoint lsn leases "lsn_lease_length": "0s", } ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + if with_intermediary: + parent_branch_name = "test_archived_parent" + parent_timeline_id = env.create_branch("test_archived_parent", tenant_id) + else: + parent_branch_name = "main" + parent_timeline_id = root_timeline_id + + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: endpoint.safe_psql_many( [ "CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')", @@ -682,14 +710,16 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ) pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Pre branch sum: {pre_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) # Create a branch and write some additional data to the parent - child_timeline_id = env.create_branch("test_archived_branch", tenant_id) + child_timeline_id = env.create_branch( + "test_archived_branch", tenant_id, ancestor_branch_name=parent_branch_name + ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: - # Do some churn of the data. This is important so that we can overwrite image layers. - for i in range(10): + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: + # Do some overwriting churn with compactions in between. This is important so that we can overwrite image layers. + for i in range(5): endpoint.safe_psql_many( [ f"SELECT setseed(0.23{i})", @@ -698,9 +728,9 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0", ] ) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Post branch sum: {post_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) if offload_child is not None: ps_http.timeline_archival_config( @@ -715,9 +745,19 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op assert leaf_detail["is_archived"] is True if "offload" in offload_child: ps_http.timeline_offload(tenant_id, child_timeline_id) + if "offload-parent" in offload_child: + # Also offload the parent to ensure the retain_lsn of the child + # is entered in the parent at unoffloading + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + ps_http.timeline_offload(tenant_id, parent_timeline_id) # Do a restart to get rid of any in-memory objects (we only init gc info once, at attach) - env.pageserver.stop() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.stop() if offload_child == "offload-corrupt": assert isinstance(env.pageserver_remote_storage, S3Storage) listing = list_prefix( @@ -752,13 +792,21 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ".*page_service_conn_main.*could not find data for key.*", ] ) - env.pageserver.start() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.start() + if offload_child == "offload-parent": + wait_until_tenant_active(ps_http, tenant_id=tenant_id) + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.UNARCHIVED, + ) # Do an agressive gc and compaction of the parent branch - ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0) + ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=parent_timeline_id, gc_horizon=0) ps_http.timeline_checkpoint( tenant_id, - root_timeline_id, + parent_timeline_id, force_l0_compaction=True, force_repartition=True, wait_until_uploaded=True, @@ -774,10 +822,15 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Now, after unarchival, the child timeline should still have its data accessible (or corrupted) if offload_child == "offload-corrupt": - with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"): - env.endpoints.create_start( + if with_intermediary: + error_regex = "(.*could not read .* from page server.*|.*relation .* does not exist)" + else: + error_regex = ".*failed to get basebackup.*" + with pytest.raises((RuntimeError, IoError, UndefinedTable), match=error_regex): + with env.endpoints.create_start( "test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1 - ) + ) as endpoint: + endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") else: with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint: sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")