Skip to content

Commit

Permalink
feat(detach_ancestor): better retries with persistent gc blocking (#8430
Browse files Browse the repository at this point in the history
)

With the persistent gc blocking, we can now retry reparenting timelines
which had failed for whatever reason on the previous attempt(s).
Restructure the detach_ancestor into three phases:

- prepare (insert persistent gc blocking, copy lsn prefix, layers)
- detach and reparent
    - reparenting can fail, so we might need to retry this portion
- complete (remove persistent gc blocking)

Cc: #6994
  • Loading branch information
koivunej authored Aug 13, 2024
1 parent 87a5d7d commit 6d6e2c6
Show file tree
Hide file tree
Showing 12 changed files with 958 additions and 237 deletions.
31 changes: 29 additions & 2 deletions libs/utils/src/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,40 @@ use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion {
_token: TaskTrackerToken,
token: TaskTrackerToken,
}

impl std::fmt::Debug for Completion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Completion")
.field("siblings", &self.token.task_tracker().len())
.finish()
}
}

impl Completion {
/// Returns true if this completion is associated with the given barrier.
pub fn blocks(&self, barrier: &Barrier) -> bool {
TaskTracker::ptr_eq(self.token.task_tracker(), &barrier.0)
}

pub fn barrier(&self) -> Barrier {
Barrier(self.token.task_tracker().clone())
}
}

/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(TaskTracker);

impl std::fmt::Debug for Barrier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Barrier")
.field("remaining", &self.0.len())
.finish()
}
}

impl Default for Barrier {
fn default() -> Self {
let (_, rx) = channel();
Expand Down Expand Up @@ -51,5 +78,5 @@ pub fn channel() -> (Completion, Barrier) {
tracker.close();

let token = tracker.token();
(Completion { _token: token }, Barrier(tracker))
(Completion { token }, Barrier(tracker))
}
4 changes: 2 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1887,18 +1887,18 @@ async fn timeline_detach_ancestor_handler(
// drop(tenant);

let resp = match progress {
detach_ancestor::Progress::Prepared(_guard, prepared) => {
detach_ancestor::Progress::Prepared(attempt, prepared) => {
// it would be great to tag the guard on to the tenant activation future
let reparented_timelines = state
.tenant_manager
.complete_detaching_timeline_ancestor(
tenant_shard_id,
timeline_id,
prepared,
attempt,
ctx,
)
.await
.context("timeline detach ancestor completion")
.map_err(ApiError::InternalServerError)?;

AncestorDetached {
Expand Down
12 changes: 8 additions & 4 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ pub struct Tenant {
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,

/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
/// An ongoing timeline detach concurrency limiter.
///
/// As a tenant will likely be restarted as part of timeline detach ancestor it makes no sense
/// to have two running at the same time. A different one can be started if an earlier one
/// has failed for whatever reason.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,

/// `index_part.json` based gc blocking reason tracking.
Expand Down Expand Up @@ -833,9 +837,9 @@ impl Tenant {
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
// if it errors, we will call make_broken when tenant is already in Stopping.
assert!(
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);

*state = TenantState::broken_from_reason(err.to_string());
});
Expand Down
3 changes: 3 additions & 0 deletions pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,15 @@ impl TimelineMetadata {
}

/// When reparenting, the `ancestor_lsn` does not change.
///
/// Returns true if anything was changed.
pub fn reparent(&mut self, timeline: &TimelineId) {
assert!(self.body.ancestor_timeline.is_some());
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
self.body.ancestor_timeline = Some(*timeline);
}

/// Returns true if anything was changed
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
if let Some(ancestor) = self.body.ancestor_timeline {
assert_eq!(ancestor, branchpoint.0);
Expand Down
113 changes: 85 additions & 28 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use utils::id::{TenantId, TimelineId};

use super::remote_timeline_client::remote_tenant_path;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
use super::{GlobalShutDown, TenantSharedResources};

/// For a tenant that appears in TenantsMap, it may either be
Expand Down Expand Up @@ -1927,8 +1927,10 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
mut attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<HashSet<TimelineId>, anyhow::Error> {
use crate::tenant::timeline::detach_ancestor::Error;
// FIXME: this is unnecessary, slotguard already has these semantics
struct RevertOnDropSlot(Option<SlotGuard>);

Expand Down Expand Up @@ -1977,43 +1979,98 @@ impl TenantManager {

let timeline = tenant.get_timeline(timeline_id, true)?;

let reparented = timeline
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
let resp = timeline
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
.await?;

let mut slot_guard = slot_guard.into_inner();

let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
let tenant = if resp.reset_tenant_required() {
attempt.before_reset_tenant();

let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
}
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");

let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;

let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
ctx,
)?;

{
let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
assert!(
g.is_none(),
"there cannot be any new timeline detach ancestor on newly created tenant"
);
*g = Some((attempt.timeline_id, attempt.new_barrier()));
}
}

let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
slot_guard.upsert(TenantSlot::Attached(tenant.clone()))?;
tenant
} else {
tracing::info!("skipping tenant_reset as no changes made required it");
tenant
};

let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
ctx,
)?;
if let Some(reparented) = resp.completed() {
// finally ask the restarted tenant to complete the detach
//
// rationale for 9999s: we don't really have a timetable here; if retried, the caller
// will get an 503.
tenant
.wait_to_become_active(std::time::Duration::from_secs(9999))
.await
.map_err(|e| {
use pageserver_api::models::TenantState;
use GetActiveTenantError::{Cancelled, WillNotBecomeActive};
match e {
Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
Error::ShuttingDown
}
other => Error::Unexpected(other.into()),
}
})?;

slot_guard.upsert(TenantSlot::Attached(tenant))?;
utils::pausable_failpoint!(
"timeline-detach-ancestor::after_activating_before_finding-pausable"
);

Ok(reparented)
let timeline = tenant
.get_timeline(attempt.timeline_id, true)
.map_err(|_| Error::DetachedNotFoundAfterRestart)?;

timeline
.complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
.await
.map(|()| reparented)
.map_err(|e| e.into())
} else {
// at least the latest versions have now been downloaded and refreshed; be ready to
// retry another time.
Err(anyhow::anyhow!(
"failed to reparent all candidate timelines, please retry"
))
}
}

/// A page service client sends a TenantId, and to look up the correct Tenant we must
Expand Down
70 changes: 39 additions & 31 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,13 @@ impl RemoteTimelineClient {
Ok(())
}

/// Reparent this timeline to a new parent.
///
/// A retryable step of timeline ancestor detach.
pub(crate) async fn schedule_reparenting_and_wait(
self: &Arc<Self>,
new_parent: &TimelineId,
) -> anyhow::Result<()> {
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
// and reads the in-memory part we cannot do the detaching like this
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
Expand All @@ -752,17 +753,25 @@ impl RemoteTimelineClient {
));
};

upload_queue.dirty.metadata.reparent(new_parent);
upload_queue.dirty.lineage.record_previous_ancestor(&prev);
let uploaded = &upload_queue.clean.0.metadata;

self.schedule_index_upload(upload_queue)?;
if uploaded.ancestor_timeline().is_none() && !uploaded.ancestor_lsn().is_valid() {
// nothing to do
None
} else {
upload_queue.dirty.metadata.reparent(new_parent);
upload_queue.dirty.lineage.record_previous_ancestor(&prev);

self.schedule_barrier0(upload_queue)
self.schedule_index_upload(upload_queue)?;

Some(self.schedule_barrier0(upload_queue))
}
};

Self::wait_completion0(receiver)
.await
.context("wait completion")
if let Some(receiver) = receiver {
Self::wait_completion0(receiver).await?;
}
Ok(())
}

/// Schedules uploading a new version of `index_part.json` with the given layers added,
Expand All @@ -778,26 +787,30 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;

upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
upload_queue.dirty.lineage.record_detaching(&adopted);

for layer in layers {
upload_queue
.dirty
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
}
if upload_queue.clean.0.lineage.detached_previous_ancestor() == Some(adopted) {
None
} else {
upload_queue.dirty.metadata.detach_from_ancestor(&adopted);
upload_queue.dirty.lineage.record_detaching(&adopted);

for layer in layers {
let prev = upload_queue
.dirty
.layer_metadata
.insert(layer.layer_desc().layer_name(), layer.metadata());
assert!(prev.is_none(), "copied layer existed already {layer}");
}

self.schedule_index_upload(upload_queue)?;
self.schedule_index_upload(upload_queue)?;

let barrier = self.schedule_barrier0(upload_queue);
self.launch_queued_tasks(upload_queue);
barrier
Some(self.schedule_barrier0(upload_queue))
}
};

Self::wait_completion0(barrier)
.await
.context("wait completion")
if let Some(barrier) = barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
}

/// Adds a gc blocking reason for this timeline if one does not exist already.
Expand Down Expand Up @@ -873,12 +886,7 @@ impl RemoteTimelineClient {
let upload_queue = guard.initialized_mut()?;

if let index::GcBlockingReason::DetachAncestor = reason {
if !upload_queue
.clean
.0
.lineage
.is_detached_from_original_ancestor()
{
if !upload_queue.clean.0.lineage.is_detached_from_ancestor() {
drop(guard);
panic!("cannot complete timeline_ancestor_detach while not detached");
}
Expand Down
Loading

1 comment on commit 6d6e2c6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2212 tests run: 2132 passed, 1 failed, 79 skipped (full report)


Failures on Postgres 14

  • test_heavy_write_workload[neon_off-github-actions-selfhosted-10-5-5]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_heavy_write_workload[neon_off-release-pg14-github-actions-selfhosted-10-5-5]"

Code coverage* (full report)

  • functions: 32.3% (7165 of 22161 functions)
  • lines: 50.3% (57928 of 115176 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
6d6e2c6 at 2024-08-13T20:17:45.606Z :recycle:

Please sign in to comment.