diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d52fb5e93d4b..80ca69631331 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -10,6 +10,7 @@ use std::{ io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, str::FromStr, + sync::atomic::AtomicUsize, time::{Duration, SystemTime}, }; @@ -308,13 +309,88 @@ pub struct TenantConfig { pub switch_aux_file_policy: Option, } +/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy` +/// tenant config. When the first aux file written, the policy will be persisted in the +/// `index_part.json` file and has a limited migration path. +/// +/// Currently, we only allow the following migration path: +/// +/// Unset -> V1 +/// -> V2 +/// -> CrossValidation -> V2 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum AuxFilePolicy { + /// V1 aux file policy: store everything in AUX_FILE_KEY V1, + /// V2 aux file policy: store in the AUX_FILE keyspace V2, + /// Cross validation runs both formats on the write path and does validation + /// on the read path. CrossValidation, } +impl AuxFilePolicy { + pub fn is_valid_migration_path(from: Option, to: Self) -> bool { + matches!( + (from, to), + (None, _) | (Some(AuxFilePolicy::CrossValidation), AuxFilePolicy::V2) + ) + } + + /// If a tenant writes aux files without setting `switch_aux_policy`, this value will be used. + pub fn default_tenant_config() -> Self { + Self::V1 + } +} + +/// The aux file policy memory flag. Users can store `Option` into this atomic flag. 0 == unspecified. +pub struct AtomicAuxFilePolicy(AtomicUsize); + +impl AtomicAuxFilePolicy { + pub fn new(policy: Option) -> Self { + Self(AtomicUsize::new( + policy.map(AuxFilePolicy::to_usize).unwrap_or_default(), + )) + } + + pub fn load(&self) -> Option { + match self.0.load(std::sync::atomic::Ordering::Acquire) { + 0 => None, + other => Some(AuxFilePolicy::from_usize(other)), + } + } + + pub fn store(&self, policy: Option) { + self.0.store( + policy.map(AuxFilePolicy::to_usize).unwrap_or_default(), + std::sync::atomic::Ordering::Release, + ); + } +} + +impl AuxFilePolicy { + pub fn to_usize(self) -> usize { + match self { + Self::V1 => 1, + Self::CrossValidation => 2, + Self::V2 => 3, + } + } + + pub fn try_from_usize(this: usize) -> Option { + match this { + 1 => Some(Self::V1), + 2 => Some(Self::CrossValidation), + 3 => Some(Self::V2), + _ => None, + } + } + + pub fn from_usize(this: usize) -> Self { + Self::try_from_usize(this).unwrap() + } +} + impl FromStr for AuxFilePolicy { type Err = anyhow::Error; @@ -604,6 +680,9 @@ pub struct TimelineInfo { pub state: TimelineState, pub walreceiver_status: String, + + /// The last aux file policy being used on this timeline + pub last_aux_file_policy: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -1505,4 +1584,59 @@ mod tests { assert_eq!(actual, expected, "example on {line}"); } } + + #[test] + fn test_aux_file_migration_path() { + assert!(AuxFilePolicy::is_valid_migration_path( + None, + AuxFilePolicy::V1 + )); + assert!(AuxFilePolicy::is_valid_migration_path( + None, + AuxFilePolicy::V2 + )); + assert!(AuxFilePolicy::is_valid_migration_path( + None, + AuxFilePolicy::CrossValidation + )); + // Self-migration is not a valid migration path, and the caller should handle it by itself. + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V1), + AuxFilePolicy::V1 + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V2), + AuxFilePolicy::V2 + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::CrossValidation), + AuxFilePolicy::CrossValidation + )); + // Migrations not allowed + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::CrossValidation), + AuxFilePolicy::V1 + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V1), + AuxFilePolicy::V2 + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V2), + AuxFilePolicy::V1 + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V2), + AuxFilePolicy::CrossValidation + )); + assert!(!AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::V1), + AuxFilePolicy::CrossValidation + )); + // Migrations allowed + assert!(AuxFilePolicy::is_valid_migration_path( + Some(AuxFilePolicy::CrossValidation), + AuxFilePolicy::V2 + )); + } } diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 1fb75584fc92..e92c352dab1d 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -219,6 +219,7 @@ fn handle_metadata( let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?; println!("Current metadata:\n{meta:?}"); let mut update_meta = false; + // TODO: simplify this part if let Some(disk_consistent_lsn) = disk_consistent_lsn { meta = TimelineMetadata::new( *disk_consistent_lsn, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b8d5c67ce0ab..7efd48afc7a4 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -439,6 +439,8 @@ async fn build_timeline_info_common( state, walreceiver_status, + + last_aux_file_policy: timeline.last_aux_file_policy.load(), }; Ok(info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 1092d64d33fe..402f075365ab 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -35,7 +35,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -718,10 +718,11 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result, PageReconstructError> { - match self.get_switch_aux_file_policy() { - AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await, - AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, - AuxFilePolicy::CrossValidation => { + let current_policy = self.last_aux_file_policy.load(); + match current_policy { + Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await, + Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await, + Some(AuxFilePolicy::CrossValidation) => { let v1_result = self.list_aux_files_v1(lsn, ctx).await; let v2_result = self.list_aux_files_v2(lsn, ctx).await; match (v1_result, v2_result) { @@ -1469,7 +1470,27 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let policy = self.tline.get_switch_aux_file_policy(); + let switch_policy = self.tline.get_switch_aux_file_policy(); + + let policy = { + let current_policy = self.tline.last_aux_file_policy.load(); + // Allowed switch path: + // * no aux files -> v1/v2/cross-validation + // * cross-validation->v2 + if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) { + self.tline.last_aux_file_policy.store(Some(switch_policy)); + self.tline + .remote_client + .schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?; + info!(current=?current_policy, next=?switch_policy, "switching aux file policy"); + switch_policy + } else { + // This branch handles non-valid migration path, and the case that switch_policy == current_policy. + // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit. + current_policy.unwrap_or(AuxFilePolicy::default_tenant_config()) + } + }; + if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { let key = aux_file::encode_aux_file_key(path); // retrieve the key from the engine diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 54b63f70425a..d42b9082b783 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::TimelineState; use pageserver_api::models::TopTenantShardItem; use pageserver_api::models::WalRedoManagerStatus; @@ -529,6 +530,7 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, + last_aux_file_policy: Option, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -539,6 +541,10 @@ impl Tenant { ancestor.clone(), resources, CreateTimelineCause::Load, + // This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`, + // there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence. + // Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2. + last_aux_file_policy, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -553,6 +559,10 @@ impl Tenant { if let Some(index_part) = index_part.as_ref() { timeline.remote_client.init_upload_queue(index_part)?; + + timeline + .last_aux_file_policy + .store(index_part.last_aux_file_policy()); } else { // No data on the remote storage, but we have local metadata file. We can end up // here with timeline_create being interrupted before finishing index part upload. @@ -1173,12 +1183,15 @@ impl Tenant { None }; + let last_aux_file_policy = index_part.last_aux_file_policy(); + self.timeline_init_and_sync( timeline_id, resources, Some(index_part), remote_metadata, ancestor, + last_aux_file_policy, ctx, ) .await @@ -1358,6 +1371,7 @@ impl Tenant { create_guard, initdb_lsn, None, + None, ) .await } @@ -2441,6 +2455,7 @@ impl Tenant { ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, + last_aux_file_policy: Option, ) -> anyhow::Result> { let state = match cause { CreateTimelineCause::Load => { @@ -2469,6 +2484,7 @@ impl Tenant { resources, pg_version, state, + last_aux_file_policy, self.cancel.child_token(), ); @@ -3119,6 +3135,7 @@ impl Tenant { timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), + src_timeline.last_aux_file_policy.load(), ) .await?; @@ -3312,6 +3329,7 @@ impl Tenant { timeline_create_guard, pgdata_lsn, None, + None, ) .await?; @@ -3383,6 +3401,7 @@ impl Tenant { create_guard: TimelineCreateGuard<'a>, start_lsn: Lsn, ancestor: Option>, + last_aux_file_policy: Option, ) -> anyhow::Result { let tenant_shard_id = self.tenant_shard_id; @@ -3398,6 +3417,7 @@ impl Tenant { ancestor, resources, CreateTimelineCause::Load, + last_aux_file_policy, ) .context("Failed to create timeline data structure")?; @@ -5621,4 +5641,280 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_branch_copies_dirty_aux_file_flag() { + let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap(); + + // the default aux file policy to switch is v1 if not set by the admins + assert_eq!( + harness.tenant_conf.switch_aux_file_policy, + AuxFilePolicy::V1 + ); + let (tenant, ctx) = harness.load().await; + + let mut lsn = Lsn(0x08); + + let tline: Arc = tenant + .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + // no aux file is written at this point, so the persistent flag should be unset + assert_eq!(tline.last_aux_file_policy.load(), None); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test1", b"first", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + // there is no tenant manager to pass the configuration through, so lets mimic it + tenant.set_new_location_config( + AttachedTenantConf::try_from(LocationConf::attached_single( + TenantConfOpt { + switch_aux_file_policy: Some(AuxFilePolicy::V2), + ..Default::default() + }, + tenant.generation, + &pageserver_api::models::ShardParameters::default(), + )) + .unwrap(), + ); + + assert_eq!( + tline.get_switch_aux_file_policy(), + AuxFilePolicy::V2, + "wanted state has been updated" + ); + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::V1), + "aux file is written with switch_aux_file_policy unset (which is v1), so we should keep v1" + ); + + // we can read everything from the storage + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test1"), + Some(&bytes::Bytes::from_static(b"first")) + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test2", b"second", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::V1), + "keep v1 storage format when new files are written" + ); + + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test2"), + Some(&bytes::Bytes::from_static(b"second")) + ); + + let child = tenant + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx) + .await + .unwrap(); + + // child copies the last flag even if that is not on remote storage yet + assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2); + assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V1)); + + let files = child.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!(files.get("pg_logical/mappings/test1"), None); + assert_eq!(files.get("pg_logical/mappings/test2"), None); + + // even if we crash here without flushing parent timeline with it's new + // last_aux_file_policy we are safe, because child was never meant to access ancestor's + // files. the ancestor can even switch back to V1 because of a migration safely. + } + + #[tokio::test] + async fn aux_file_policy_switch() { + let mut harness = TenantHarness::create("aux_file_policy_switch").unwrap(); + harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode + let (tenant, ctx) = harness.load().await; + + let mut lsn = Lsn(0x08); + + let tline: Arc = tenant + .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + assert_eq!( + tline.last_aux_file_policy.load(), + None, + "no aux file is written so it should be unset" + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test1", b"first", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + // there is no tenant manager to pass the configuration through, so lets mimic it + tenant.set_new_location_config( + AttachedTenantConf::try_from(LocationConf::attached_single( + TenantConfOpt { + switch_aux_file_policy: Some(AuxFilePolicy::V2), + ..Default::default() + }, + tenant.generation, + &pageserver_api::models::ShardParameters::default(), + )) + .unwrap(), + ); + + assert_eq!( + tline.get_switch_aux_file_policy(), + AuxFilePolicy::V2, + "wanted state has been updated" + ); + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::CrossValidation), + "dirty index_part.json reflected state is yet to be updated" + ); + + // we can still read the auxfile v1 before we ingest anything new + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test1"), + Some(&bytes::Bytes::from_static(b"first")) + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test2", b"second", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::V2), + "ingesting a file should apply the wanted switch state when applicable" + ); + + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test1"), + Some(&bytes::Bytes::from_static(b"first")), + "cross validation writes to both v1 and v2 so this should be available in v2" + ); + assert_eq!( + files.get("pg_logical/mappings/test2"), + Some(&bytes::Bytes::from_static(b"second")) + ); + + // mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file) + tenant.set_new_location_config( + AttachedTenantConf::try_from(LocationConf::attached_single( + TenantConfOpt { + switch_aux_file_policy: Some(AuxFilePolicy::V1), + ..Default::default() + }, + tenant.generation, + &pageserver_api::models::ShardParameters::default(), + )) + .unwrap(), + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test2", b"third", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + assert_eq!( + tline.get_switch_aux_file_policy(), + AuxFilePolicy::V1, + "wanted state has been updated again, even if invalid request" + ); + + assert_eq!( + tline.last_aux_file_policy.load(), + Some(AuxFilePolicy::V2), + "ingesting a file should apply the wanted switch state when applicable" + ); + + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test1"), + Some(&bytes::Bytes::from_static(b"first")) + ); + assert_eq!( + files.get("pg_logical/mappings/test2"), + Some(&bytes::Bytes::from_static(b"third")) + ); + + // mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file) + tenant.set_new_location_config( + AttachedTenantConf::try_from(LocationConf::attached_single( + TenantConfOpt { + switch_aux_file_policy: Some(AuxFilePolicy::V2), + ..Default::default() + }, + tenant.generation, + &pageserver_api::models::ShardParameters::default(), + )) + .unwrap(), + ); + + { + lsn += 8; + let mut modification = tline.begin_modification(lsn); + modification + .put_file("pg_logical/mappings/test3", b"last", &ctx) + .await + .unwrap(); + modification.commit(&ctx).await.unwrap(); + } + + assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2); + + assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2)); + + let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); + assert_eq!( + files.get("pg_logical/mappings/test1"), + Some(&bytes::Bytes::from_static(b"first")) + ); + assert_eq!( + files.get("pg_logical/mappings/test2"), + Some(&bytes::Bytes::from_static(b"third")) + ); + assert_eq!( + files.get("pg_logical/mappings/test3"), + Some(&bytes::Bytes::from_static(b"last")) + ); + } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index a743ce3c16a3..a695363cdced 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -373,6 +373,8 @@ pub struct TenantConf { /// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into /// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions. + /// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux + /// file is written. pub switch_aux_file_policy: AuxFilePolicy, } @@ -574,7 +576,7 @@ impl Default for TenantConf { lazy_slru_download: false, timeline_get_throttle: crate::tenant::throttle::Config::disabled(), image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD, - switch_aux_file_policy: AuxFilePolicy::V1, + switch_aux_file_policy: AuxFilePolicy::default_tenant_config(), } } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 3a1113cf0108..d3adae684159 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -189,6 +189,7 @@ use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; pub(crate) use download::download_initdb_tar_zst; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::shard::{ShardIndex, TenantShardId}; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; @@ -611,6 +612,17 @@ impl RemoteTimelineClient { Ok(()) } + /// Launch an index-file upload operation in the background, with only aux_file_policy flag updated. + pub(crate) fn schedule_index_upload_for_aux_file_policy_update( + self: &Arc, + last_aux_file_policy: Option, + ) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + upload_queue.last_aux_file_policy = last_aux_file_policy; + self.schedule_index_upload(upload_queue); + Ok(()) + } /// /// Launch an index-file upload operation in the background, if necessary. /// @@ -1851,6 +1863,7 @@ impl RemoteTimelineClient { dangling_files: HashMap::default(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: initialized.last_aux_file_policy, }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index b114d6aa1047..032dda7ff395 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use chrono::NaiveDateTime; +use pageserver_api::models::AuxFilePolicy; use serde::{Deserialize, Serialize}; use utils::id::TimelineId; @@ -88,6 +89,16 @@ pub struct IndexPart { #[serde(default)] pub(crate) lineage: Lineage, + + /// Describes the kind of aux files stored in the timeline. + /// + /// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable. + /// A V1 setting after V2 files have been committed is not accepted. + /// + /// None means no aux files have been written to the storage before the point + /// when this flag is introduced. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub(crate) last_aux_file_policy: Option, } impl IndexPart { @@ -101,10 +112,11 @@ impl IndexPart { /// is always generated from the keys of `layer_metadata`) /// - 4: timeline_layers is fully removed. /// - 5: lineage was added - const LATEST_VERSION: usize = 5; + /// - 6: last_aux_file_policy is added. + const LATEST_VERSION: usize = 6; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -113,6 +125,7 @@ impl IndexPart { disk_consistent_lsn: Lsn, metadata: TimelineMetadata, lineage: Lineage, + last_aux_file_policy: Option, ) -> Self { let layer_metadata = layers_and_metadata .iter() @@ -126,6 +139,7 @@ impl IndexPart { metadata, deleted_at: None, lineage, + last_aux_file_policy, } } @@ -155,8 +169,13 @@ impl IndexPart { example_metadata.disk_consistent_lsn(), example_metadata, Default::default(), + Some(AuxFilePolicy::V1), ) } + + pub(crate) fn last_aux_file_policy(&self) -> Option { + self.last_aux_file_policy + } } impl From<&UploadQueueInitialized> for IndexPart { @@ -165,7 +184,13 @@ impl From<&UploadQueueInitialized> for IndexPart { let metadata = uq.latest_metadata.clone(); let lineage = uq.latest_lineage.clone(); - Self::new(&uq.latest_files, disk_consistent_lsn, metadata, lineage) + Self::new( + &uq.latest_files, + disk_consistent_lsn, + metadata, + lineage, + uq.last_aux_file_policy, + ) } } @@ -299,6 +324,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -340,6 +366,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -383,6 +410,7 @@ mod tests { deleted_at: Some(chrono::NaiveDateTime::parse_from_str( "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -428,6 +456,7 @@ mod tests { .unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -468,6 +497,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -511,6 +541,57 @@ mod tests { reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), }, + last_aux_file_policy: None, + }; + + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v6_indexpart_is_parsed() { + let example = r#"{ + "version":6, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0], + "deleted_at": "2023-07-31T09:00:00.123", + "lineage":{ + "original_ancestor":["e2bfd8c633d713d279e6fcd2bcc15b6d","0/15A7618","2024-05-07T18:52:36.322426563"], + "reparenting_history":["e1bfd8c633d713d279e6fcd2bcc15b6d"] + }, + "last_aux_file_policy": "V2" + }"#; + + let expected = IndexPart { + version: 6, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { + // serde_json should always parse this but this might be a double with jq for + // example. + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), + deleted_at: Some(chrono::NaiveDateTime::parse_from_str( + "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), + lineage: Lineage { + reparenting_history_truncated: false, + reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], + original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), + }, + last_aux_file_policy: Some(AuxFilePolicy::V2), }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index df9bc9b35b85..1fb1928079b1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,7 +23,7 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, SparseKeyPartitioning}, models::{ - AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, + AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState, }, @@ -413,7 +413,11 @@ pub struct Timeline { /// Keep aux directory cache to avoid it's reconstruction on each update pub(crate) aux_files: tokio::sync::Mutex, + /// Size estimator for aux file v2 pub(crate) aux_file_size_estimator: AuxFileSizeEstimator, + + /// Indicate whether aux file v2 storage is enabled. + pub(crate) last_aux_file_policy: AtomicAuxFilePolicy, } pub struct WalReceiverInfo { @@ -2133,6 +2137,7 @@ impl Timeline { resources: TimelineResources, pg_version: u32, state: TimelineState, + aux_file_policy: Option, cancel: CancellationToken, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -2257,6 +2262,8 @@ impl Timeline { }), aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics), + + last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 901f5149b37f..b5dfc86e77a1 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -280,6 +280,8 @@ impl DeleteTimelineFlow { // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, + // Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace + None, ) .context("create_timeline_struct")?; diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index a2f761fa949a..c0cc8f3124de 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -8,6 +8,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use chrono::NaiveDateTime; +use pageserver_api::models::AuxFilePolicy; use std::sync::Arc; use tracing::info; use utils::lsn::AtomicLsn; @@ -60,6 +61,9 @@ pub(crate) struct UploadQueueInitialized { /// Part of the flattened "next" `index_part.json`. pub(crate) latest_lineage: Lineage, + /// The last aux file policy used on this timeline. + pub(crate) last_aux_file_policy: Option, + /// `disk_consistent_lsn` from the last metadata file that was successfully /// uploaded. `Lsn(0)` if nothing was uploaded yet. /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. @@ -189,6 +193,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: Default::default(), }; *self = UploadQueue::Initialized(state); @@ -239,6 +244,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: index_part.last_aux_file_policy(), }; *self = UploadQueue::Initialized(state); diff --git a/test_runner/regress/test_aux_files.py b/test_runner/regress/test_aux_files.py new file mode 100644 index 000000000000..be9c41a8679b --- /dev/null +++ b/test_runner/regress/test_aux_files.py @@ -0,0 +1,72 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + logical_replication_sync, +) + + +def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg): + env = neon_env_builder.init_start() + endpoint = env.endpoints.create_start("main") + client = env.pageserver.http_client() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V2" + client.set_tenant_config(tenant_id, tenant_config) + # aux file v2 is enabled on the write path, so for now, it should be unset (or null) + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"] + is None + ) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" + ) + cur.execute("create publication pub1 for table t, replication_example") + + # now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils) + # instead of going through the full logical replication process. + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" + ) + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + vanilla_pg.stop() + endpoint.stop() + + with env.pageserver.http_client() as client: + # aux file v2 flag should be enabled at this point + assert client.timeline_detail(tenant_id, timeline_id)["last_aux_file_policy"] == "V2" + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V1" + client.set_tenant_config(tenant_id, tenant_config) + # the flag should still be enabled + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + env.pageserver.restart() + with env.pageserver.http_client() as client: + # aux file v2 flag should be persisted + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + )