Skip to content

Commit

Permalink
feat(pageserver): persist aux file policy in timeline metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed May 8, 2024
1 parent 5418e80 commit 41fd310
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 3 deletions.
24 changes: 24 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,27 @@ pub enum AuxFilePolicy {
CrossValidation,
}

impl AuxFilePolicy {
pub fn opt_to_int(this: Option<Self>) -> usize {
match this {
Some(AuxFilePolicy::V1) => 1,
Some(AuxFilePolicy::V2) => 2,
Some(AuxFilePolicy::CrossValidation) => 255,
None => 0,
}
}

pub fn int_to_opt(this: usize) -> Option<Self> {
match this {
1 => Some(AuxFilePolicy::V1),
2 => Some(AuxFilePolicy::V2),
255 => Some(AuxFilePolicy::CrossValidation),
0 => None,
_ => unreachable!(),
}
}
}

impl FromStr for AuxFilePolicy {
type Err = anyhow::Error;

Expand Down Expand Up @@ -604,6 +625,9 @@ pub struct TimelineInfo {
pub state: TimelineState,

pub walreceiver_status: String,

/// Whether aux file v2 is enabled
pub last_aux_file_policy: Option<AuxFilePolicy>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
4 changes: 4 additions & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ fn handle_metadata(
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
// TODO: simplify this part
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
Expand All @@ -228,6 +229,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.last_aux_file_policy(),
);
update_meta = true;
}
Expand All @@ -240,6 +242,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.last_aux_file_policy(),
);
update_meta = true;
}
Expand All @@ -252,6 +255,7 @@ fn handle_metadata(
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
meta.last_aux_file_policy(),
);
update_meta = true;
}
Expand Down
7 changes: 7 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::ShardParameters;
Expand Down Expand Up @@ -433,6 +434,12 @@ async fn build_timeline_info_common(
state,

walreceiver_status,

last_aux_file_policy: AuxFilePolicy::int_to_opt(
timeline
.last_aux_file_policy
.load(std::sync::atomic::Ordering::SeqCst),
),
};
Ok(info)
}
Expand Down
27 changes: 25 additions & 2 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
Expand Down Expand Up @@ -1465,7 +1465,30 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let policy = self.tline.get_switch_aux_file_policy();
let switch_policy = self.tline.get_switch_aux_file_policy();

let policy = {
let current_policy = AuxFilePolicy::int_to_opt(
self.tline
.last_aux_file_policy
.load(std::sync::atomic::Ordering::SeqCst),
);
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
match (current_policy, switch_policy) {
(None, _) | (Some(AuxFilePolicy::CrossValidation), AuxFilePolicy::V2) => {
self.tline.last_aux_file_policy.store(
AuxFilePolicy::opt_to_int(Some(switch_policy)),
std::sync::atomic::Ordering::SeqCst,
);
info!("switching to aux file policy {:?}", switch_policy);
switch_policy
}
(Some(current_policy), _) => current_policy,
}
};

if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
Expand Down Expand Up @@ -1353,6 +1354,7 @@ impl Tenant {
initdb_lsn,
initdb_lsn,
pg_version,
None,
);
self.prepare_new_timeline(
new_timeline_id,
Expand Down Expand Up @@ -3100,6 +3102,7 @@ impl Tenant {
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
AuxFilePolicy::int_to_opt(src_timeline.last_aux_file_policy.load(Ordering::SeqCst)),
);

let uninitialized_timeline = self
Expand Down Expand Up @@ -3303,6 +3306,7 @@ impl Tenant {
pgdata_lsn,
pgdata_lsn,
pg_version,
None,
);
let raw_timeline = self
.prepare_new_timeline(
Expand Down
21 changes: 21 additions & 0 deletions pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! [`remote_timeline_client`]: super::remote_timeline_client
use anyhow::ensure;
use pageserver_api::models::AuxFilePolicy;
use serde::{de::Error, Deserialize, Serialize, Serializer};
use utils::bin_ser::SerializeError;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
Expand Down Expand Up @@ -56,6 +57,7 @@ struct TimelineMetadataBodyV3 {
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
last_aux_file_policy: Option<AuxFilePolicy>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -100,6 +102,7 @@ struct TimelineMetadataBodyV1 {
}

impl TimelineMetadata {
#[allow(clippy::too_many_arguments)]
pub fn new(
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
Expand All @@ -108,6 +111,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
Expand All @@ -123,6 +127,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn,
initdb_lsn,
pg_version,
last_aux_file_policy,
},
}
}
Expand All @@ -146,6 +151,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: body.pg_version,
last_aux_file_policy: None,
};

hdr.format_version = METADATA_FORMAT_VERSION;
Expand All @@ -165,6 +171,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
last_aux_file_policy: None,
};

hdr.format_version = METADATA_FORMAT_VERSION;
Expand Down Expand Up @@ -266,6 +273,10 @@ impl TimelineMetadata {
self.body.pg_version
}

pub fn last_aux_file_policy(&self) -> Option<AuxFilePolicy> {
self.body.last_aux_file_policy
}

// Checksums make it awkward to build a valid instance by hand. This helper
// provides a TimelineMetadata with a valid checksum in its header.
#[cfg(test)]
Expand All @@ -278,6 +289,7 @@ impl TimelineMetadata {
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
0,
None,
);
let bytes = instance.to_bytes().unwrap();
Self::from_bytes(&bytes).unwrap()
Expand All @@ -287,6 +299,7 @@ impl TimelineMetadata {
self.body.disk_consistent_lsn = update.disk_consistent_lsn;
self.body.prev_record_lsn = update.prev_record_lsn;
self.body.latest_gc_cutoff_lsn = update.latest_gc_cutoff_lsn;
self.body.last_aux_file_policy = update.last_aux_file_policy;
}
}

Expand Down Expand Up @@ -317,18 +330,21 @@ pub(crate) struct MetadataUpdate {
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
last_aux_file_policy: Option<AuxFilePolicy>,
}

impl MetadataUpdate {
pub(crate) fn new(
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> Self {
Self {
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cutoff_lsn,
last_aux_file_policy,
}
}
}
Expand All @@ -352,6 +368,7 @@ mod tests {
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
Some(AuxFilePolicy::V2),
);

let metadata_bytes = original_metadata
Expand Down Expand Up @@ -426,6 +443,7 @@ mod tests {
Lsn(0),
Lsn(0),
14, // All timelines created before this version had pg_version 14
None,
);

assert_eq!(
Expand Down Expand Up @@ -495,6 +513,7 @@ mod tests {
Lsn(0),
Lsn(0),
16,
None,
);

assert_eq!(
Expand All @@ -520,6 +539,7 @@ mod tests {
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
pg_version: 16,
last_aux_file_policy: Some(AuxFilePolicy::V2),
},
};

Expand All @@ -539,6 +559,7 @@ mod tests {
Lsn(0),
Lsn(0),
16,
Some(AuxFilePolicy::V2),
);

assert_eq!(deserialized_metadata.body, expected_metadata.body);
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,7 @@ mod tests {
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
None,
);

// go through serialize + deserialize to fix the header, including checksum
Expand Down
15 changes: 14 additions & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use std::time::{Duration, Instant, SystemTime};
use std::{
array,
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::AtomicU64,
sync::atomic::{AtomicU64, AtomicUsize},
};
use std::{
cmp::{max, min, Ordering},
Expand Down Expand Up @@ -409,6 +409,9 @@ pub struct Timeline {

/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,

/// Indicate whether aux file v2 storage is enabled.
pub(crate) last_aux_file_policy: AtomicUsize,
}

pub struct WalReceiverInfo {
Expand Down Expand Up @@ -2257,6 +2260,8 @@ impl Timeline {
dir: None,
n_deltas: 0,
}),

last_aux_file_policy: AtomicUsize::new(AuxFilePolicy::opt_to_int(None)),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
Expand Down Expand Up @@ -2406,6 +2411,13 @@ impl Timeline {
let shard = self.get_shard_index();
let this = self.myself.upgrade().expect("&self method holds the arc");

if let Some(ref index_part) = index_part {
self.last_aux_file_policy.store(
AuxFilePolicy::opt_to_int(index_part.metadata.last_aux_file_policy()),
AtomicOrdering::SeqCst,
);
}

let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
move || {
let _g = span.entered();
Expand Down Expand Up @@ -3937,6 +3949,7 @@ impl Timeline {
disk_consistent_lsn,
ondisk_prev_record_lsn,
*self.latest_gc_cutoff_lsn.read(),
AuxFilePolicy::int_to_opt(self.last_aux_file_policy.load(AtomicOrdering::SeqCst)),
);

fail_point!("checkpoint-before-saving-metadata", |x| bail!(
Expand Down
Loading

0 comments on commit 41fd310

Please sign in to comment.