diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1df5820fb946d..1d805ddf4994d 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -315,6 +315,27 @@ pub enum AuxFilePolicy { CrossValidation, } +impl AuxFilePolicy { + pub fn opt_to_int(this: Option) -> usize { + match this { + Some(AuxFilePolicy::V1) => 1, + Some(AuxFilePolicy::V2) => 2, + Some(AuxFilePolicy::CrossValidation) => 255, + None => 0, + } + } + + pub fn int_to_opt(this: usize) -> Option { + match this { + 1 => Some(AuxFilePolicy::V1), + 2 => Some(AuxFilePolicy::V2), + 255 => Some(AuxFilePolicy::CrossValidation), + 0 => None, + _ => unreachable!(), + } + } +} + impl FromStr for AuxFilePolicy { type Err = anyhow::Error; @@ -604,6 +625,9 @@ pub struct TimelineInfo { pub state: TimelineState, pub walreceiver_status: String, + + /// Whether aux file v2 is enabled + pub last_aux_file_policy: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 1fb75584fc921..c2b2c340a189b 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -219,6 +219,7 @@ fn handle_metadata( let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?; println!("Current metadata:\n{meta:?}"); let mut update_meta = false; + // TODO: simplify this part if let Some(disk_consistent_lsn) = disk_consistent_lsn { meta = TimelineMetadata::new( *disk_consistent_lsn, @@ -228,6 +229,7 @@ fn handle_metadata( meta.latest_gc_cutoff_lsn(), meta.initdb_lsn(), meta.pg_version(), + meta.last_aux_file_policy(), ); update_meta = true; } @@ -240,6 +242,7 @@ fn handle_metadata( meta.latest_gc_cutoff_lsn(), meta.initdb_lsn(), meta.pg_version(), + meta.last_aux_file_policy(), ); update_meta = true; } @@ -252,6 +255,7 @@ fn handle_metadata( *latest_gc_cuttoff, meta.initdb_lsn(), meta.pg_version(), + meta.last_aux_file_policy(), ); update_meta = true; } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a8ca642dc59af..608ee1aadf665 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -14,6 +14,7 @@ use hyper::header; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::LocationConfig; use pageserver_api::models::LocationConfigListResponse; use pageserver_api::models::ShardParameters; @@ -433,6 +434,12 @@ async fn build_timeline_info_common( state, walreceiver_status, + + last_aux_file_policy: AuxFilePolicy::int_to_opt( + timeline + .last_aux_file_policy + .load(std::sync::atomic::Ordering::SeqCst), + ), }; Ok(info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a4215ee107b2f..e17af2ed579ec 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -35,7 +35,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -1465,7 +1465,30 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let policy = self.tline.get_switch_aux_file_policy(); + let switch_policy = self.tline.get_switch_aux_file_policy(); + + let policy = { + let current_policy = AuxFilePolicy::int_to_opt( + self.tline + .last_aux_file_policy + .load(std::sync::atomic::Ordering::SeqCst), + ); + // Allowed switch path: + // * no aux files -> v1/v2/cross-validation + // * cross-validation->v2 + match (current_policy, switch_policy) { + (None, _) | (Some(AuxFilePolicy::CrossValidation), AuxFilePolicy::V2) => { + self.tline.last_aux_file_policy.store( + AuxFilePolicy::opt_to_int(Some(switch_policy)), + std::sync::atomic::Ordering::SeqCst, + ); + info!("switching to aux file policy {:?}", switch_policy); + switch_policy + } + (Some(current_policy), _) => current_policy, + } + }; + if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { let key = aux_file::encode_aux_file_key(path); // retrieve the key from the engine diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 010e56a899db4..dfd48c3e6f224 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::TimelineState; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::ShardIdentity; @@ -1353,6 +1354,7 @@ impl Tenant { initdb_lsn, initdb_lsn, pg_version, + None, ); self.prepare_new_timeline( new_timeline_id, @@ -3100,6 +3102,7 @@ impl Tenant { *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer? src_timeline.initdb_lsn, src_timeline.pg_version, + AuxFilePolicy::int_to_opt(src_timeline.last_aux_file_policy.load(Ordering::SeqCst)), ); let uninitialized_timeline = self @@ -3303,6 +3306,7 @@ impl Tenant { pgdata_lsn, pgdata_lsn, pg_version, + None, ); let raw_timeline = self .prepare_new_timeline( diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 6ee33396f4174..2064da3a4723c 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -9,6 +9,7 @@ //! [`remote_timeline_client`]: super::remote_timeline_client use anyhow::ensure; +use pageserver_api::models::AuxFilePolicy; use serde::{de::Error, Deserialize, Serialize, Serializer}; use utils::bin_ser::SerializeError; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn}; @@ -56,6 +57,7 @@ struct TimelineMetadataBodyV3 { latest_gc_cutoff_lsn: Lsn, initdb_lsn: Lsn, pg_version: u32, + last_aux_file_policy: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -100,6 +102,7 @@ struct TimelineMetadataBodyV1 { } impl TimelineMetadata { + #[allow(clippy::too_many_arguments)] pub fn new( disk_consistent_lsn: Lsn, prev_record_lsn: Option, @@ -108,6 +111,7 @@ impl TimelineMetadata { latest_gc_cutoff_lsn: Lsn, initdb_lsn: Lsn, pg_version: u32, + last_aux_file_policy: Option, ) -> Self { Self { hdr: TimelineMetadataHeader { @@ -123,6 +127,7 @@ impl TimelineMetadata { latest_gc_cutoff_lsn, initdb_lsn, pg_version, + last_aux_file_policy, }, } } @@ -146,6 +151,7 @@ impl TimelineMetadata { latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn, initdb_lsn: body.initdb_lsn, pg_version: body.pg_version, + last_aux_file_policy: None, }; hdr.format_version = METADATA_FORMAT_VERSION; @@ -165,6 +171,7 @@ impl TimelineMetadata { latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn, initdb_lsn: body.initdb_lsn, pg_version: 14, // All timelines created before this version had pg_version 14 + last_aux_file_policy: None, }; hdr.format_version = METADATA_FORMAT_VERSION; @@ -266,6 +273,10 @@ impl TimelineMetadata { self.body.pg_version } + pub fn last_aux_file_policy(&self) -> Option { + self.body.last_aux_file_policy + } + // Checksums make it awkward to build a valid instance by hand. This helper // provides a TimelineMetadata with a valid checksum in its header. #[cfg(test)] @@ -278,6 +289,7 @@ impl TimelineMetadata { Lsn::from_hex("00000000").unwrap(), Lsn::from_hex("00000000").unwrap(), 0, + None, ); let bytes = instance.to_bytes().unwrap(); Self::from_bytes(&bytes).unwrap() @@ -287,6 +299,7 @@ impl TimelineMetadata { self.body.disk_consistent_lsn = update.disk_consistent_lsn; self.body.prev_record_lsn = update.prev_record_lsn; self.body.latest_gc_cutoff_lsn = update.latest_gc_cutoff_lsn; + self.body.last_aux_file_policy = update.last_aux_file_policy; } } @@ -317,6 +330,7 @@ pub(crate) struct MetadataUpdate { disk_consistent_lsn: Lsn, prev_record_lsn: Option, latest_gc_cutoff_lsn: Lsn, + last_aux_file_policy: Option, } impl MetadataUpdate { @@ -324,11 +338,13 @@ impl MetadataUpdate { disk_consistent_lsn: Lsn, prev_record_lsn: Option, latest_gc_cutoff_lsn: Lsn, + last_aux_file_policy: Option, ) -> Self { Self { disk_consistent_lsn, prev_record_lsn, latest_gc_cutoff_lsn, + last_aux_file_policy, } } } @@ -352,6 +368,7 @@ mod tests { Lsn(0), // Any version will do here, so use the default crate::DEFAULT_PG_VERSION, + Some(AuxFilePolicy::V2), ); let metadata_bytes = original_metadata @@ -426,6 +443,7 @@ mod tests { Lsn(0), Lsn(0), 14, // All timelines created before this version had pg_version 14 + None, ); assert_eq!( @@ -495,6 +513,7 @@ mod tests { Lsn(0), Lsn(0), 16, + None, ); assert_eq!( @@ -520,6 +539,7 @@ mod tests { latest_gc_cutoff_lsn: Lsn(0), initdb_lsn: Lsn(0), pg_version: 16, + last_aux_file_policy: Some(AuxFilePolicy::V2), }, }; @@ -539,6 +559,7 @@ mod tests { Lsn(0), Lsn(0), 16, + Some(AuxFilePolicy::V2), ); assert_eq!(deserialized_metadata.body, expected_metadata.body); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bbe4e16378a15..9b24afc5fdb23 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1994,6 +1994,7 @@ mod tests { // Any version will do // but it should be consistent with the one in the tests crate::DEFAULT_PG_VERSION, + None, ); // go through serialize + deserialize to fix the header, including checksum diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7edb922069c0a..2b2415083514f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -53,7 +53,7 @@ use std::time::{Duration, Instant, SystemTime}; use std::{ array, collections::{BTreeMap, HashMap, HashSet}, - sync::atomic::AtomicU64, + sync::atomic::{AtomicU64, AtomicUsize}, }; use std::{ cmp::{max, min, Ordering}, @@ -409,6 +409,9 @@ pub struct Timeline { /// Keep aux directory cache to avoid it's reconstruction on each update pub(crate) aux_files: tokio::sync::Mutex, + + /// Indicate whether aux file v2 storage is enabled. + pub(crate) last_aux_file_policy: AtomicUsize, } pub struct WalReceiverInfo { @@ -2257,6 +2260,8 @@ impl Timeline { dir: None, n_deltas: 0, }), + + last_aux_file_policy: AtomicUsize::new(AuxFilePolicy::opt_to_int(None)), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -2406,6 +2411,13 @@ impl Timeline { let shard = self.get_shard_index(); let this = self.myself.upgrade().expect("&self method holds the arc"); + if let Some(ref index_part) = index_part { + self.last_aux_file_policy.store( + AuxFilePolicy::opt_to_int(index_part.metadata.last_aux_file_policy()), + AtomicOrdering::SeqCst, + ); + } + let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({ move || { let _g = span.entered(); @@ -3937,6 +3949,7 @@ impl Timeline { disk_consistent_lsn, ondisk_prev_record_lsn, *self.latest_gc_cutoff_lsn.read(), + AuxFilePolicy::int_to_opt(self.last_aux_file_policy.load(AtomicOrdering::SeqCst)), ); fail_point!("checkpoint-before-saving-metadata", |x| bail!( diff --git a/test_runner/regress/test_aux_files.py b/test_runner/regress/test_aux_files.py new file mode 100644 index 0000000000000..9e85187f839e0 --- /dev/null +++ b/test_runner/regress/test_aux_files.py @@ -0,0 +1,62 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + logical_replication_sync, +) + + +def test_aux_v2_config_switch(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_aux_v2_config_switch", "empty") + endpoint = env.endpoints.create_start( + "test_aux_v2_config_switch", config_lines=["log_statement=all"] + ) + + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V2" + client.set_tenant_config(tenant_id, tenant_config) + # aux file v2 is enabled on the write path + assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] == None + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" + ) + cur.execute("create publication pub1 for table t, replication_example") + + # now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils) + # instead of going through the full logical replication process. + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" + ) + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + vanilla_pg.stop() + endpoint.stop() + + with env.pageserver.http_client() as client: + # aux file v2 flag should be enabled at this point + assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"] == "V2" + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V1" + client.set_tenant_config(tenant_id, tenant_config) + # the flag should still be enabled + assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"] == "V2" + env.pageserver.restart() + with env.pageserver.http_client() as client: + # aux file v2 flag should be persisted + assert client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"] == "V2"