diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 5a84763697697..a8fa8486e8c52 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,7 +17,7 @@ use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; use pageserver_api::models::{ - self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, + self, LocationConfig, ShardParameters, SwitchAuxFilePolicy, TenantHistorySize, TenantInfo, TimelineInfo, }; use pageserver_api::shard::TenantShardId; @@ -380,7 +380,7 @@ impl PageServerNode { .context("parse `timeline_get_throttle` from json")?, switch_aux_file_policy: settings .remove("switch_aux_file_policy") - .map(|x| x.parse::()) + .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, }; @@ -503,7 +503,7 @@ impl PageServerNode { .context("parse `timeline_get_throttle` from json")?, switch_aux_file_policy: settings .remove("switch_aux_file_policy") - .map(|x| x.parse::()) + .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1df5820fb946d..8e68a7c3ba078 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -10,6 +10,7 @@ use std::{ io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, str::FromStr, + sync::atomic::AtomicUsize, time::{Duration, SystemTime}, }; @@ -305,17 +306,107 @@ pub struct TenantConfig { pub lazy_slru_download: Option, pub timeline_get_throttle: Option, pub image_layer_creation_check_threshold: Option, - pub switch_aux_file_policy: Option, + pub switch_aux_file_policy: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum AuxFilePolicy { +pub enum SwitchAuxFilePolicy { V1, V2, CrossValidation, } -impl FromStr for AuxFilePolicy { +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RuntimeAuxFilePolicy { + V1, + V2, + CrossValidation, + Unspecified, +} + +pub struct AtomicRuntimeAuxFilePolicy(AtomicUsize); + +impl AtomicRuntimeAuxFilePolicy { + pub fn new(policy: RuntimeAuxFilePolicy) -> Self { + Self(AtomicUsize::new(policy.to_usize())) + } + + pub fn load(&self) -> RuntimeAuxFilePolicy { + RuntimeAuxFilePolicy::from_usize(self.0.load(std::sync::atomic::Ordering::SeqCst)) + } + + pub fn store(&self, policy: RuntimeAuxFilePolicy) { + self.0.store( + RuntimeAuxFilePolicy::to_usize(policy), + std::sync::atomic::Ordering::SeqCst, + ); + } +} + +impl SwitchAuxFilePolicy { + pub fn to_usize(self) -> usize { + match self { + Self::V1 => 1, + Self::CrossValidation => 2, + Self::V2 => 3, + } + } + + pub fn from_usize(this: usize) -> Self { + match this { + 1 => Self::V1, + 2 => Self::CrossValidation, + 3 => Self::V2, + _ => unreachable!(), + } + } + + pub fn to_runtime_policy(&self) -> RuntimeAuxFilePolicy { + RuntimeAuxFilePolicy::from_usize(self.to_usize()) + } +} + +impl RuntimeAuxFilePolicy { + pub fn to_usize(self) -> usize { + match self { + Self::V1 => 1, + Self::CrossValidation => 2, + Self::V2 => 3, + Self::Unspecified => 0, + } + } + + pub fn from_usize(this: usize) -> Self { + match this { + 1 => Self::V1, + 2 => Self::CrossValidation, + 3 => Self::V2, + 0 => Self::Unspecified, + _ => unreachable!(), + } + } +} + +impl FromStr for RuntimeAuxFilePolicy { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); + if s == "" || s == "unspecified" { + return Ok(Self::Unspecified); + } else if s == "v1" { + Ok(Self::V1) + } else if s == "v2" { + Ok(Self::V2) + } else if s == "crossvalidation" || s == "cross_validation" { + Ok(Self::CrossValidation) + } else { + anyhow::bail!("cannot parse {} to aux file policy", s) + } + } +} + +impl FromStr for SwitchAuxFilePolicy { type Err = anyhow::Error; fn from_str(s: &str) -> Result { @@ -604,6 +695,9 @@ pub struct TimelineInfo { pub state: TimelineState, pub walreceiver_status: String, + + /// Whether aux file v2 is enabled + pub last_aux_file_policy: RuntimeAuxFilePolicy, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 1fb75584fc921..e92c352dab1d5 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -219,6 +219,7 @@ fn handle_metadata( let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?; println!("Current metadata:\n{meta:?}"); let mut update_meta = false; + // TODO: simplify this part if let Some(disk_consistent_lsn) = disk_consistent_lsn { meta = TimelineMetadata::new( *disk_consistent_lsn, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a8ca642dc59af..8fcf84f5d1dee 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -433,6 +433,8 @@ async fn build_timeline_info_common( state, walreceiver_status, + + last_aux_file_policy: timeline.last_aux_file_policy.load(), }; Ok(info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a4215ee107b2f..1ddd089e1bbd0 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -24,7 +24,7 @@ use pageserver_api::key::{ AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; -use pageserver_api::models::AuxFilePolicy; +use pageserver_api::models::{RuntimeAuxFilePolicy, SwitchAuxFilePolicy}; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -35,7 +35,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -714,10 +714,13 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result, PageReconstructError> { - match self.get_switch_aux_file_policy() { - AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await, - AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, - AuxFilePolicy::CrossValidation => { + let current_policy = self.last_aux_file_policy.load(); + match current_policy { + RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::Unspecified => { + self.list_aux_files_v1(lsn, ctx).await + } + RuntimeAuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, + RuntimeAuxFilePolicy::CrossValidation => { let v1_result = self.list_aux_files_v1(lsn, ctx).await; let v2_result = self.list_aux_files_v2(lsn, ctx).await; match (v1_result, v2_result) { @@ -1447,7 +1450,7 @@ impl<'a> DatadirModification<'a> { } pub fn init_aux_dir(&mut self) -> anyhow::Result<()> { - if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { + if let SwitchAuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { return Ok(()); } let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { @@ -1465,8 +1468,34 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let policy = self.tline.get_switch_aux_file_policy(); - if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { + let switch_policy = self.tline.get_switch_aux_file_policy(); + + let policy = { + let current_policy = self.tline.last_aux_file_policy.load(); + // Allowed switch path: + // * no aux files -> v1/v2/cross-validation + // * cross-validation->v2 + match (current_policy, switch_policy) { + (RuntimeAuxFilePolicy::Unspecified, _) + | (RuntimeAuxFilePolicy::CrossValidation, SwitchAuxFilePolicy::V2) => { + let new_policy = switch_policy.to_runtime_policy(); + self.tline.last_aux_file_policy.store(new_policy); + info!("switching to aux file policy {:?}", new_policy); + if let Some(ref remote_client) = self.tline.remote_client { + remote_client + .schedule_index_upload_for_aux_file_policy_update(new_policy)?; + remote_client.wait_completion().await?; + } + info!("finish switching to aux file policy {:?}", new_policy); + new_policy + } + (current_policy, _) => current_policy, + } + }; + + assert!(policy != RuntimeAuxFilePolicy::Unspecified); + + if let RuntimeAuxFilePolicy::V2 | RuntimeAuxFilePolicy::CrossValidation = policy { let key = aux_file::encode_aux_file_key(path); // retrieve the key from the engine let old_val = match self.get(key, ctx).await { @@ -1495,7 +1524,7 @@ impl<'a> DatadirModification<'a> { self.put(key, Value::Image(new_val.into())); } - if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy { + if let RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::CrossValidation = policy { let file_path = path.to_string(); let content = if content.is_empty() { None diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 010e56a899db4..bfe8ea57688bf 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::RuntimeAuxFilePolicy; use pageserver_api::models::TimelineState; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::ShardIdentity; @@ -528,6 +529,7 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, + last_aux_file_policy: RuntimeAuxFilePolicy, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -538,6 +540,10 @@ impl Tenant { ancestor.clone(), resources, CreateTimelineCause::Load, + // This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`, + // there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence. + // Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2. + last_aux_file_policy, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -1176,12 +1182,15 @@ impl Tenant { None }; + let last_aux_file_policy = index_part.last_aux_file_policy(); + self.timeline_init_and_sync( timeline_id, resources, Some(index_part), remote_metadata, ancestor, + last_aux_file_policy, ctx, ) .await @@ -1360,6 +1369,7 @@ impl Tenant { create_guard, initdb_lsn, None, + RuntimeAuxFilePolicy::Unspecified, ) .await } @@ -2431,6 +2441,7 @@ impl Tenant { ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> anyhow::Result> { let state = match cause { CreateTimelineCause::Load => { @@ -2459,6 +2470,7 @@ impl Tenant { resources, pg_version, state, + last_aux_file_policy, self.cancel.child_token(), ); @@ -3109,6 +3121,7 @@ impl Tenant { timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), + src_timeline.last_aux_file_policy.load(), ) .await?; @@ -3311,6 +3324,7 @@ impl Tenant { timeline_create_guard, pgdata_lsn, None, + RuntimeAuxFilePolicy::Unspecified, ) .await?; @@ -3388,6 +3402,7 @@ impl Tenant { create_guard: TimelineCreateGuard<'a>, start_lsn: Lsn, ancestor: Option>, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> anyhow::Result { let tenant_shard_id = self.tenant_shard_id; @@ -3403,6 +3418,7 @@ impl Tenant { ancestor, resources, CreateTimelineCause::Load, + last_aux_file_policy, ) .context("Failed to create timeline data structure")?; diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index a743ce3c16a30..b5a46713d98a3 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,9 +9,9 @@ //! may lead to a data loss. //! use anyhow::bail; -use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::CompactionAlgorithm; use pageserver_api::models::EvictionPolicy; +use pageserver_api::models::SwitchAuxFilePolicy; use pageserver_api::models::{self, ThrottleConfig}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use serde::de::IntoDeserializer; @@ -373,7 +373,7 @@ pub struct TenantConf { /// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into /// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions. - pub switch_aux_file_policy: AuxFilePolicy, + pub switch_aux_file_policy: SwitchAuxFilePolicy, } /// Same as TenantConf, but this struct preserves the information about @@ -472,7 +472,7 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub switch_aux_file_policy: Option, + pub switch_aux_file_policy: Option, } impl TenantConfOpt { @@ -574,7 +574,7 @@ impl Default for TenantConf { lazy_slru_download: false, timeline_get_throttle: crate::tenant::throttle::Config::disabled(), image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD, - switch_aux_file_policy: AuxFilePolicy::V1, + switch_aux_file_policy: SwitchAuxFilePolicy::V1, } } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bbe4e16378a15..62e46ad4247bb 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -189,6 +189,7 @@ use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; pub(crate) use download::download_initdb_tar_zst; +use pageserver_api::models::RuntimeAuxFilePolicy; use pageserver_api::shard::{ShardIndex, TenantShardId}; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; @@ -596,6 +597,17 @@ impl RemoteTimelineClient { Ok(()) } + /// Launch an index-file upload operation in the background, with only aux_file_policy flag updated. + pub(crate) fn schedule_index_upload_for_aux_file_policy_update( + self: &Arc, + last_aux_file_policy: RuntimeAuxFilePolicy, + ) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + upload_queue.last_aux_file_policy = last_aux_file_policy; + self.schedule_index_upload(upload_queue); + Ok(()) + } /// /// Launch an index-file upload operation in the background, if necessary. /// @@ -923,7 +935,7 @@ impl RemoteTimelineClient { } /// Wait for all previously scheduled uploads/deletions to complete - pub(crate) async fn wait_completion(self: &Arc) -> anyhow::Result<()> { + pub async fn wait_completion(self: &Arc) -> anyhow::Result<()> { let receiver = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -1824,6 +1836,7 @@ impl RemoteTimelineClient { dangling_files: HashMap::default(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: initialized.last_aux_file_policy, }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 3e05905afafb0..19941971ddeb7 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use chrono::NaiveDateTime; +use pageserver_api::models::RuntimeAuxFilePolicy; use serde::{Deserialize, Serialize}; use crate::tenant::metadata::TimelineMetadata; @@ -84,6 +85,9 @@ pub struct IndexPart { #[serde(rename = "metadata_bytes")] pub metadata: TimelineMetadata, + + // Added in version 5. None == RuntimeAuxFilePolicy::Unspecified. Use Option wrapper to keep forward compatibility. + pub last_aux_file_policy: Option, } impl IndexPart { @@ -96,10 +100,11 @@ impl IndexPart { /// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers /// is always generated from the keys of `layer_metadata`) /// - 4: timeline_layers is fully removed. - const LATEST_VERSION: usize = 4; + /// - 5: last_aux_file_policy is added. + const LATEST_VERSION: usize = 5; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -107,6 +112,7 @@ impl IndexPart { layers_and_metadata: &HashMap, disk_consistent_lsn: Lsn, metadata: TimelineMetadata, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> Self { let layer_metadata = layers_and_metadata .iter() @@ -119,6 +125,7 @@ impl IndexPart { disk_consistent_lsn, metadata, deleted_at: None, + last_aux_file_policy: Some(last_aux_file_policy), } } @@ -147,8 +154,14 @@ impl IndexPart { &HashMap::new(), example_metadata.disk_consistent_lsn(), example_metadata, + RuntimeAuxFilePolicy::V2, ) } + + pub fn last_aux_file_policy(&self) -> RuntimeAuxFilePolicy { + self.last_aux_file_policy + .unwrap_or(RuntimeAuxFilePolicy::Unspecified) + } } impl From<&UploadQueueInitialized> for IndexPart { @@ -156,7 +169,12 @@ impl From<&UploadQueueInitialized> for IndexPart { let disk_consistent_lsn = uq.latest_metadata.disk_consistent_lsn(); let metadata = uq.latest_metadata.clone(); - Self::new(&uq.latest_files, disk_consistent_lsn, metadata) + Self::new( + &uq.latest_files, + disk_consistent_lsn, + metadata, + uq.last_aux_file_policy, + ) } } @@ -221,6 +239,7 @@ mod tests { disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -261,6 +280,7 @@ mod tests { disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -302,7 +322,8 @@ mod tests { disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: Some(chrono::NaiveDateTime::parse_from_str( - "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()) + "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -347,6 +368,7 @@ mod tests { ]) .unwrap(), deleted_at: None, + last_aux_file_policy: None, }; let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -387,6 +409,48 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: Some(chrono::NaiveDateTime::parse_from_str( "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), + last_aux_file_policy: None, + }; + + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v5_indexpart_is_parsed() { + let example = r#"{ + "version":5, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0], + "deleted_at": "2023-07-31T09:00:00.123", + "last_aux_file_policy": "V2" + }"#; + + let expected = IndexPart { + version: 5, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { + // serde_json should always parse this but this might be a double with jq for + // example. + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), + deleted_at: Some(chrono::NaiveDateTime::parse_from_str( + "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), + last_aux_file_policy:Some(RuntimeAuxFilePolicy::V2), }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7edb922069c0a..7e89bebc4de5c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,9 +23,9 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, SparseKeyPartitioning}, models::{ - AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, + AtomicRuntimeAuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, - TimelineState, + RuntimeAuxFilePolicy, SwitchAuxFilePolicy, TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, ShardNumber, TenantShardId}, @@ -409,6 +409,9 @@ pub struct Timeline { /// Keep aux directory cache to avoid it's reconstruction on each update pub(crate) aux_files: tokio::sync::Mutex, + + /// Indicate whether aux file v2 storage is enabled. + pub(crate) last_aux_file_policy: AtomicRuntimeAuxFilePolicy, } pub struct WalReceiverInfo { @@ -2000,7 +2003,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { - pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy { + pub(crate) fn get_switch_aux_file_policy(&self) -> SwitchAuxFilePolicy { let tenant_conf = self.tenant_conf.load(); tenant_conf .tenant_conf @@ -2138,6 +2141,7 @@ impl Timeline { resources: TimelineResources, pg_version: u32, state: TimelineState, + aux_file_policy: RuntimeAuxFilePolicy, cancel: CancellationToken, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -2257,6 +2261,8 @@ impl Timeline { dir: None, n_deltas: 0, }), + + last_aux_file_policy: AtomicRuntimeAuxFilePolicy::new(aux_file_policy), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -2406,6 +2412,11 @@ impl Timeline { let shard = self.get_shard_index(); let this = self.myself.upgrade().expect("&self method holds the arc"); + if let Some(ref index_part) = index_part { + self.last_aux_file_policy + .store(index_part.last_aux_file_policy()); + } + let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({ move || { let _g = span.entered(); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d8701be170132..471a1e50310b0 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -4,7 +4,10 @@ use std::{ }; use anyhow::Context; -use pageserver_api::{models::TimelineState, shard::TenantShardId}; +use pageserver_api::{ + models::{RuntimeAuxFilePolicy, TimelineState}, + shard::TenantShardId, +}; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId}; @@ -278,6 +281,7 @@ impl DeleteTimelineFlow { // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, + RuntimeAuxFilePolicy::Unspecified, // Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace ) .context("create_timeline_struct")?; diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 7797117e0f0f6..6bce47c285be7 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -7,6 +7,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use chrono::NaiveDateTime; +use pageserver_api::models::RuntimeAuxFilePolicy; use std::sync::Arc; use tracing::info; use utils::lsn::AtomicLsn; @@ -56,6 +57,8 @@ pub(crate) struct UploadQueueInitialized { /// DANGER: do not return to outside world, e.g., safekeepers. pub(crate) latest_metadata: TimelineMetadata, + pub(crate) last_aux_file_policy: RuntimeAuxFilePolicy, + /// `disk_consistent_lsn` from the last metadata file that was successfully /// uploaded. `Lsn(0)` if nothing was uploaded yet. /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. @@ -184,6 +187,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: RuntimeAuxFilePolicy::Unspecified, }; *self = UploadQueue::Initialized(state); @@ -233,6 +237,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: index_part.last_aux_file_policy(), }; *self = UploadQueue::Initialized(state); diff --git a/test_runner/regress/test_aux_files.py b/test_runner/regress/test_aux_files.py new file mode 100644 index 0000000000000..139d49c9f23d1 --- /dev/null +++ b/test_runner/regress/test_aux_files.py @@ -0,0 +1,81 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + logical_replication_sync, +) + + +def test_aux_v2_config_switch(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_aux_v2_config_switch", "empty") + endpoint = env.endpoints.create_start( + "test_aux_v2_config_switch", config_lines=["log_statement=all"] + ) + + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V2" + client.set_tenant_config(tenant_id, tenant_config) + # aux file v2 is enabled on the write path + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "Unspecified" + ) + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" + ) + cur.execute("create publication pub1 for table t, replication_example") + + # now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils) + # instead of going through the full logical replication process. + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" + ) + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + vanilla_pg.stop() + endpoint.stop() + + with env.pageserver.http_client() as client: + # aux file v2 flag should be enabled at this point + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V1" + client.set_tenant_config(tenant_id, tenant_config) + # the flag should still be enabled + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + env.pageserver.restart() + with env.pageserver.http_client() as client: + # aux file v2 flag should be persisted + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + # TODO(chi): test with timeline detach?