Skip to content

Commit

Permalink
pageserver: deletion queue & generation validation for deletions (#5207)
Browse files Browse the repository at this point in the history
## Problem

Pageservers must not delete objects or advertise updates to
remote_consistent_lsn without checking that they hold the latest
generation for the tenant in question (see [the RFC](
https://github.com/neondatabase/neon/blob/main/docs/rfcs/025-generation-numbers.md))

In this PR:
- A new "deletion queue" subsystem is introduced, through which
deletions flow
- `RemoteTimelineClient` is modified to send deletions through the
deletion queue:
- For GC & compaction, deletions flow through the full generation
verifying process
- For timeline deletions, deletions take a fast path that bypasses
generation verification
- The `last_uploaded_consistent_lsn` value in `UploadQueue` is replaced
with a mechanism that maintains a "projected" lsn (equivalent to the
previous property), and a "visible" LSN (which is the one that we may
share with safekeepers).
- Until `control_plane_api` is set, all deletions skip generation
validation
- Tests are introduced for the new functionality in
`test_pageserver_generations.py`

Once this lands, if a pageserver is configured with the
`control_plane_api` configuration added in
#5163, it becomes safe to
attach a tenant to multiple pageservers concurrently.

---------

Co-authored-by: Joonas Koivunen <[email protected]>
Co-authored-by: Christian Schwarz <[email protected]>
  • Loading branch information
3 people authored Sep 26, 2023
1 parent 16f0622 commit ba92668
Show file tree
Hide file tree
Showing 34 changed files with 3,387 additions and 310 deletions.
1 change: 1 addition & 0 deletions control_plane/src/bin/attachment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
if attach_req.pageserver_id.is_some() {
tenant_state.generation += 1;
}
tenant_state.pageserver = attach_req.pageserver_id;
let generation = tenant_state.generation;

locked.save().await.map_err(ApiError::InternalServerError)?;
Expand Down
7 changes: 7 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,15 @@ pub struct TimelineInfo {
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,

/// The LSN that we have succesfully uploaded to remote storage
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,

/// The LSN that we are advertizing to safekeepers
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn_visible: Lsn,

pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
Expand Down
27 changes: 27 additions & 0 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{

use anyhow::{bail, Context};

use serde::{Deserialize, Serialize};
use tokio::io;
use toml_edit::Item;
use tracing::info;
Expand All @@ -42,6 +43,9 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

/// As defined in S3 docs
pub const MAX_KEYS_PER_DELETE: usize = 1000;

const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';

/// Path on the remote storage, relative to some inner prefix.
Expand All @@ -50,6 +54,25 @@ const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemotePath(PathBuf);

impl Serialize for RemotePath {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}

impl<'de> Deserialize<'de> for RemotePath {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
Ok(Self(PathBuf::from(&str)))
}
}

impl std::fmt::Display for RemotePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.display())
Expand Down Expand Up @@ -88,6 +111,10 @@ impl RemotePath {
pub fn extension(&self) -> Option<&str> {
self.0.extension()?.to_str()
}

pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Path, std::path::StripPrefixError> {
self.0.strip_prefix(&p.0)
}
}

/// Storage (potentially remote) API to manage its state.
Expand Down
7 changes: 3 additions & 4 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ use tracing::debug;

use super::StorageMetadata;
use crate::{
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, RemotePath, RemoteStorage, S3Config, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};

const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;

pub(super) mod metrics;

use self::metrics::{AttemptOutcome, RequestKind};
Expand Down Expand Up @@ -500,7 +499,7 @@ impl RemoteStorage for S3Bucket {
delete_objects.push(obj_id);
}

for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
let started_at = start_measuring_requests(kind);

let resp = self
Expand Down
16 changes: 16 additions & 0 deletions libs/utils/src/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ impl Generation {
Self::Broken => panic!("Attempted to use a broken generation"),
}
}

pub fn next(&self) -> Generation {
match self {
Self::Valid(n) => Self::Valid(*n + 1),
Self::None => Self::Valid(1),
Self::Broken => panic!("Attempted to use a broken generation"),
}
}

pub fn into(self) -> Option<u32> {
if let Self::Valid(v) = self {
Some(v)
} else {
None
}
}
}

impl Serialize for Generation {
Expand Down
7 changes: 7 additions & 0 deletions libs/utils/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub enum ApiError {
#[error("Precondition failed: {0}")]
PreconditionFailed(Box<str>),

#[error("Shutting down")]
ShuttingDown,

#[error(transparent)]
InternalServerError(anyhow::Error),
}
Expand Down Expand Up @@ -52,6 +55,10 @@ impl ApiError {
self.to_string(),
StatusCode::PRECONDITION_FAILED,
),
ApiError::ShuttingDown => HttpErrorBody::response_from_msg_and_status(
"Shutting down".to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
29 changes: 25 additions & 4 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};

use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
Expand All @@ -20,6 +21,7 @@ use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
Expand Down Expand Up @@ -346,9 +348,22 @@ fn start_pageserver(
}
};

// Top-level cancellation token for the process
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();

// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;

// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver),
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}

// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.
startup_checkpoint("initial", "Starting loading tenants");
Expand Down Expand Up @@ -379,13 +394,13 @@ fn start_pageserver(
};

// Scan the local 'tenants/' directory and start loading the tenants
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();

let deletion_queue_client = deletion_queue.new_client();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
},
order,
shutdown_pageserver.clone(),
Expand Down Expand Up @@ -481,9 +496,10 @@ fn start_pageserver(
http::routes::State::new(
conf,
http_auth.clone(),
remote_storage,
remote_storage.clone(),
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
)
.context("Failed to initialize router state")?,
);
Expand Down Expand Up @@ -611,7 +627,12 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
bg_remote_storage.map(|_| bg_deletion_queue),
0,
));
unreachable!()
}
})
Expand Down
34 changes: 31 additions & 3 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ impl PageServerConfigBuilder {
self.background_task_maximum_delay = BuilderValue::Set(delay);
}

pub fn control_plane_api(&mut self, api: Url) {
self.control_plane_api = BuilderValue::Set(Some(api))
pub fn control_plane_api(&mut self, api: Option<Url>) {
self.control_plane_api = BuilderValue::Set(api)
}

pub fn build(self) -> anyhow::Result<PageServerConf> {
Expand Down Expand Up @@ -580,6 +580,27 @@ impl PageServerConf {
self.workdir.join(TENANTS_SEGMENT_NAME)
}

pub fn deletion_prefix(&self) -> PathBuf {
self.workdir.join("deletion")
}

pub fn deletion_list_path(&self, sequence: u64) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;

self.deletion_prefix()
.join(format!("{sequence:016x}-{VERSION:02x}.list"))
}

pub fn deletion_header_path(&self) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;

self.deletion_prefix().join(format!("header-{VERSION:02x}"))
}

pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenants_path().join(tenant_id.to_string())
}
Expand Down Expand Up @@ -747,7 +768,14 @@ impl PageServerConf {
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
"background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
"control_plane_api" => builder.control_plane_api(parse_toml_string(key, item)?.parse().context("failed to parse control plane URL")?),
"control_plane_api" => {
let parsed = parse_toml_string(key, item)?;
if parsed.is_empty() {
builder.control_plane_api(None)
} else {
builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
}
},
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
Expand Down
Loading

1 comment on commit ba92668

@github-actions
Copy link

@github-actions github-actions bot commented on ba92668 Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2588 tests run: 2466 passed, 0 failed, 122 skipped (full report)


Flaky tests (5)

Postgres 16

  • test_crafted_wal_end[last_wal_record_crossing_segment]: release
  • test_partial_evict_tenant: debug
  • test_get_tenant_size_with_multiple_branches: release, debug
  • test_pageserver_lsn_wait_error_safekeeper_stop: debug

Code coverage (full report)

  • functions: 53.0% (8027 of 15145 functions)
  • lines: 81.3% (47046 of 57888 lines)

The comment gets automatically updated with the latest test results
ba92668 at 2023-09-26T17:34:43.100Z :recycle:

Please sign in to comment.