From 41453acb89f0bc34d176e5e2b1a3a74fc705d390 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Mon, 13 May 2024 15:01:34 -0400 Subject: [PATCH] feat(pageserver): persist aux file policy in timeline metadata Signed-off-by: Alex Chi Z --- control_plane/src/pageserver.rs | 6 +- libs/pageserver_api/src/models.rs | 100 +++++++++++++++++- pageserver/ctl/src/main.rs | 1 + pageserver/src/http/routes.rs | 2 + pageserver/src/pgdatadir_mapping.rs | 49 +++++++-- pageserver/src/tenant.rs | 16 +++ pageserver/src/tenant/config.rs | 8 +- .../src/tenant/remote_timeline_client.rs | 15 ++- .../tenant/remote_timeline_client/index.rs | 82 +++++++++++++- pageserver/src/tenant/timeline.rs | 18 +++- pageserver/src/tenant/timeline/delete.rs | 6 +- pageserver/src/tenant/upload_queue.rs | 5 + test_runner/regress/test_aux_files.py | 81 ++++++++++++++ 13 files changed, 361 insertions(+), 28 deletions(-) create mode 100644 test_runner/regress/test_aux_files.py diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 5a84763697697..a8fa8486e8c52 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,7 +17,7 @@ use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; use pageserver_api::models::{ - self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, + self, LocationConfig, ShardParameters, SwitchAuxFilePolicy, TenantHistorySize, TenantInfo, TimelineInfo, }; use pageserver_api::shard::TenantShardId; @@ -380,7 +380,7 @@ impl PageServerNode { .context("parse `timeline_get_throttle` from json")?, switch_aux_file_policy: settings .remove("switch_aux_file_policy") - .map(|x| x.parse::()) + .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, }; @@ -503,7 +503,7 @@ impl PageServerNode { .context("parse `timeline_get_throttle` from json")?, switch_aux_file_policy: settings .remove("switch_aux_file_policy") - .map(|x| x.parse::()) + .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1df5820fb946d..8e68a7c3ba078 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -10,6 +10,7 @@ use std::{ io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, str::FromStr, + sync::atomic::AtomicUsize, time::{Duration, SystemTime}, }; @@ -305,17 +306,107 @@ pub struct TenantConfig { pub lazy_slru_download: Option, pub timeline_get_throttle: Option, pub image_layer_creation_check_threshold: Option, - pub switch_aux_file_policy: Option, + pub switch_aux_file_policy: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum AuxFilePolicy { +pub enum SwitchAuxFilePolicy { V1, V2, CrossValidation, } -impl FromStr for AuxFilePolicy { +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RuntimeAuxFilePolicy { + V1, + V2, + CrossValidation, + Unspecified, +} + +pub struct AtomicRuntimeAuxFilePolicy(AtomicUsize); + +impl AtomicRuntimeAuxFilePolicy { + pub fn new(policy: RuntimeAuxFilePolicy) -> Self { + Self(AtomicUsize::new(policy.to_usize())) + } + + pub fn load(&self) -> RuntimeAuxFilePolicy { + RuntimeAuxFilePolicy::from_usize(self.0.load(std::sync::atomic::Ordering::SeqCst)) + } + + pub fn store(&self, policy: RuntimeAuxFilePolicy) { + self.0.store( + RuntimeAuxFilePolicy::to_usize(policy), + std::sync::atomic::Ordering::SeqCst, + ); + } +} + +impl SwitchAuxFilePolicy { + pub fn to_usize(self) -> usize { + match self { + Self::V1 => 1, + Self::CrossValidation => 2, + Self::V2 => 3, + } + } + + pub fn from_usize(this: usize) -> Self { + match this { + 1 => Self::V1, + 2 => Self::CrossValidation, + 3 => Self::V2, + _ => unreachable!(), + } + } + + pub fn to_runtime_policy(&self) -> RuntimeAuxFilePolicy { + RuntimeAuxFilePolicy::from_usize(self.to_usize()) + } +} + +impl RuntimeAuxFilePolicy { + pub fn to_usize(self) -> usize { + match self { + Self::V1 => 1, + Self::CrossValidation => 2, + Self::V2 => 3, + Self::Unspecified => 0, + } + } + + pub fn from_usize(this: usize) -> Self { + match this { + 1 => Self::V1, + 2 => Self::CrossValidation, + 3 => Self::V2, + 0 => Self::Unspecified, + _ => unreachable!(), + } + } +} + +impl FromStr for RuntimeAuxFilePolicy { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); + if s == "" || s == "unspecified" { + return Ok(Self::Unspecified); + } else if s == "v1" { + Ok(Self::V1) + } else if s == "v2" { + Ok(Self::V2) + } else if s == "crossvalidation" || s == "cross_validation" { + Ok(Self::CrossValidation) + } else { + anyhow::bail!("cannot parse {} to aux file policy", s) + } + } +} + +impl FromStr for SwitchAuxFilePolicy { type Err = anyhow::Error; fn from_str(s: &str) -> Result { @@ -604,6 +695,9 @@ pub struct TimelineInfo { pub state: TimelineState, pub walreceiver_status: String, + + /// Whether aux file v2 is enabled + pub last_aux_file_policy: RuntimeAuxFilePolicy, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 1fb75584fc921..e92c352dab1d5 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -219,6 +219,7 @@ fn handle_metadata( let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?; println!("Current metadata:\n{meta:?}"); let mut update_meta = false; + // TODO: simplify this part if let Some(disk_consistent_lsn) = disk_consistent_lsn { meta = TimelineMetadata::new( *disk_consistent_lsn, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a8ca642dc59af..8fcf84f5d1dee 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -433,6 +433,8 @@ async fn build_timeline_info_common( state, walreceiver_status, + + last_aux_file_policy: timeline.last_aux_file_policy.load(), }; Ok(info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 1092d64d33fed..3bd06eee35dd2 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -24,7 +24,7 @@ use pageserver_api::key::{ AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; -use pageserver_api::models::AuxFilePolicy; +use pageserver_api::models::{RuntimeAuxFilePolicy, SwitchAuxFilePolicy}; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -35,7 +35,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -718,10 +718,13 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result, PageReconstructError> { - match self.get_switch_aux_file_policy() { - AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await, - AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, - AuxFilePolicy::CrossValidation => { + let current_policy = self.last_aux_file_policy.load(); + match current_policy { + RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::Unspecified => { + self.list_aux_files_v1(lsn, ctx).await + } + RuntimeAuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, + RuntimeAuxFilePolicy::CrossValidation => { let v1_result = self.list_aux_files_v1(lsn, ctx).await; let v2_result = self.list_aux_files_v2(lsn, ctx).await; match (v1_result, v2_result) { @@ -1451,7 +1454,7 @@ impl<'a> DatadirModification<'a> { } pub fn init_aux_dir(&mut self) -> anyhow::Result<()> { - if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { + if let SwitchAuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { return Ok(()); } let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { @@ -1469,8 +1472,34 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let policy = self.tline.get_switch_aux_file_policy(); - if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { + let switch_policy = self.tline.get_switch_aux_file_policy(); + + let policy = { + let current_policy = self.tline.last_aux_file_policy.load(); + // Allowed switch path: + // * no aux files -> v1/v2/cross-validation + // * cross-validation->v2 + match (current_policy, switch_policy) { + (RuntimeAuxFilePolicy::Unspecified, _) + | (RuntimeAuxFilePolicy::CrossValidation, SwitchAuxFilePolicy::V2) => { + let new_policy = switch_policy.to_runtime_policy(); + self.tline.last_aux_file_policy.store(new_policy); + info!("switching to aux file policy {:?}", new_policy); + if let Some(ref remote_client) = self.tline.remote_client { + remote_client + .schedule_index_upload_for_aux_file_policy_update(new_policy)?; + remote_client.wait_completion().await?; + } + info!("finish switching to aux file policy {:?}", new_policy); + new_policy + } + (current_policy, _) => current_policy, + } + }; + + assert!(policy != RuntimeAuxFilePolicy::Unspecified); + + if let RuntimeAuxFilePolicy::V2 | RuntimeAuxFilePolicy::CrossValidation = policy { let key = aux_file::encode_aux_file_key(path); // retrieve the key from the engine let old_val = match self.get(key, ctx).await { @@ -1521,7 +1550,7 @@ impl<'a> DatadirModification<'a> { self.put(key, Value::Image(new_val.into())); } - if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy { + if let RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::CrossValidation = policy { let file_path = path.to_string(); let content = if content.is_empty() { None diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 010e56a899db4..bfe8ea57688bf 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::RuntimeAuxFilePolicy; use pageserver_api::models::TimelineState; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::ShardIdentity; @@ -528,6 +529,7 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, + last_aux_file_policy: RuntimeAuxFilePolicy, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -538,6 +540,10 @@ impl Tenant { ancestor.clone(), resources, CreateTimelineCause::Load, + // This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`, + // there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence. + // Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2. + last_aux_file_policy, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -1176,12 +1182,15 @@ impl Tenant { None }; + let last_aux_file_policy = index_part.last_aux_file_policy(); + self.timeline_init_and_sync( timeline_id, resources, Some(index_part), remote_metadata, ancestor, + last_aux_file_policy, ctx, ) .await @@ -1360,6 +1369,7 @@ impl Tenant { create_guard, initdb_lsn, None, + RuntimeAuxFilePolicy::Unspecified, ) .await } @@ -2431,6 +2441,7 @@ impl Tenant { ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> anyhow::Result> { let state = match cause { CreateTimelineCause::Load => { @@ -2459,6 +2470,7 @@ impl Tenant { resources, pg_version, state, + last_aux_file_policy, self.cancel.child_token(), ); @@ -3109,6 +3121,7 @@ impl Tenant { timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), + src_timeline.last_aux_file_policy.load(), ) .await?; @@ -3311,6 +3324,7 @@ impl Tenant { timeline_create_guard, pgdata_lsn, None, + RuntimeAuxFilePolicy::Unspecified, ) .await?; @@ -3388,6 +3402,7 @@ impl Tenant { create_guard: TimelineCreateGuard<'a>, start_lsn: Lsn, ancestor: Option>, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> anyhow::Result { let tenant_shard_id = self.tenant_shard_id; @@ -3403,6 +3418,7 @@ impl Tenant { ancestor, resources, CreateTimelineCause::Load, + last_aux_file_policy, ) .context("Failed to create timeline data structure")?; diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index a743ce3c16a30..b5a46713d98a3 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,9 +9,9 @@ //! may lead to a data loss. //! use anyhow::bail; -use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::CompactionAlgorithm; use pageserver_api::models::EvictionPolicy; +use pageserver_api::models::SwitchAuxFilePolicy; use pageserver_api::models::{self, ThrottleConfig}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use serde::de::IntoDeserializer; @@ -373,7 +373,7 @@ pub struct TenantConf { /// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into /// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions. - pub switch_aux_file_policy: AuxFilePolicy, + pub switch_aux_file_policy: SwitchAuxFilePolicy, } /// Same as TenantConf, but this struct preserves the information about @@ -472,7 +472,7 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub switch_aux_file_policy: Option, + pub switch_aux_file_policy: Option, } impl TenantConfOpt { @@ -574,7 +574,7 @@ impl Default for TenantConf { lazy_slru_download: false, timeline_get_throttle: crate::tenant::throttle::Config::disabled(), image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD, - switch_aux_file_policy: AuxFilePolicy::V1, + switch_aux_file_policy: SwitchAuxFilePolicy::V1, } } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 9103760388cec..97a1ee7f574f6 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -189,6 +189,7 @@ use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; pub(crate) use download::download_initdb_tar_zst; +use pageserver_api::models::RuntimeAuxFilePolicy; use pageserver_api::shard::{ShardIndex, TenantShardId}; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; @@ -609,6 +610,17 @@ impl RemoteTimelineClient { Ok(()) } + /// Launch an index-file upload operation in the background, with only aux_file_policy flag updated. + pub(crate) fn schedule_index_upload_for_aux_file_policy_update( + self: &Arc, + last_aux_file_policy: RuntimeAuxFilePolicy, + ) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + upload_queue.last_aux_file_policy = last_aux_file_policy; + self.schedule_index_upload(upload_queue); + Ok(()) + } /// /// Launch an index-file upload operation in the background, if necessary. /// @@ -942,7 +954,7 @@ impl RemoteTimelineClient { } /// Wait for all previously scheduled uploads/deletions to complete - pub(crate) async fn wait_completion(self: &Arc) -> anyhow::Result<()> { + pub async fn wait_completion(self: &Arc) -> anyhow::Result<()> { let receiver = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -1844,6 +1856,7 @@ impl RemoteTimelineClient { dangling_files: HashMap::default(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: initialized.last_aux_file_policy, }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index b114d6aa10478..b8f097ca88b7d 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use chrono::NaiveDateTime; +use pageserver_api::models::RuntimeAuxFilePolicy; use serde::{Deserialize, Serialize}; use utils::id::TimelineId; @@ -88,6 +89,10 @@ pub struct IndexPart { #[serde(default)] pub(crate) lineage: Lineage, + + // Added in version 6. None == RuntimeAuxFilePolicy::Unspecified. Use Option wrapper to keep forward compatibility. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub last_aux_file_policy: Option, } impl IndexPart { @@ -101,10 +106,11 @@ impl IndexPart { /// is always generated from the keys of `layer_metadata`) /// - 4: timeline_layers is fully removed. /// - 5: lineage was added - const LATEST_VERSION: usize = 5; + /// - 6: last_aux_file_policy is added. + const LATEST_VERSION: usize = 6; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -113,6 +119,7 @@ impl IndexPart { disk_consistent_lsn: Lsn, metadata: TimelineMetadata, lineage: Lineage, + last_aux_file_policy: RuntimeAuxFilePolicy, ) -> Self { let layer_metadata = layers_and_metadata .iter() @@ -126,6 +133,7 @@ impl IndexPart { metadata, deleted_at: None, lineage, + last_aux_file_policy: Some(last_aux_file_policy), } } @@ -155,8 +163,14 @@ impl IndexPart { example_metadata.disk_consistent_lsn(), example_metadata, Default::default(), + RuntimeAuxFilePolicy::V2, ) } + + pub fn last_aux_file_policy(&self) -> RuntimeAuxFilePolicy { + self.last_aux_file_policy + .unwrap_or(RuntimeAuxFilePolicy::Unspecified) + } } impl From<&UploadQueueInitialized> for IndexPart { @@ -165,7 +179,13 @@ impl From<&UploadQueueInitialized> for IndexPart { let metadata = uq.latest_metadata.clone(); let lineage = uq.latest_lineage.clone(); - Self::new(&uq.latest_files, disk_consistent_lsn, metadata, lineage) + Self::new( + &uq.latest_files, + disk_consistent_lsn, + metadata, + lineage, + uq.last_aux_file_policy, + ) } } @@ -299,6 +319,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -340,6 +361,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -383,6 +405,7 @@ mod tests { deleted_at: Some(chrono::NaiveDateTime::parse_from_str( "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -428,6 +451,7 @@ mod tests { .unwrap(), deleted_at: None, lineage: Lineage::default(), + last_aux_file_policy: None, }; let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -468,6 +492,7 @@ mod tests { metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), lineage: Lineage::default(), + last_aux_file_policy: None, }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); @@ -511,6 +536,57 @@ mod tests { reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), }, + last_aux_file_policy: None, + }; + + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v6_indexpart_is_parsed() { + let example = r#"{ + "version":6, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0], + "deleted_at": "2023-07-31T09:00:00.123", + "lineage":{ + "original_ancestor":["e2bfd8c633d713d279e6fcd2bcc15b6d","0/15A7618","2024-05-07T18:52:36.322426563"], + "reparenting_history":["e1bfd8c633d713d279e6fcd2bcc15b6d"] + }, + "last_aux_file_policy": "V2" + }"#; + + let expected = IndexPart { + version: 6, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { + // serde_json should always parse this but this might be a double with jq for + // example. + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), + deleted_at: Some(chrono::NaiveDateTime::parse_from_str( + "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), + lineage: Lineage { + reparenting_history_truncated: false, + reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], + original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), + }, + last_aux_file_policy: Some(RuntimeAuxFilePolicy::V2), }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 01f354b9e85d9..f80546dfd3cc5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,9 +23,9 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, SparseKeyPartitioning}, models::{ - AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, + AtomicRuntimeAuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, - TimelineState, + RuntimeAuxFilePolicy, SwitchAuxFilePolicy, TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, ShardNumber, TenantShardId}, @@ -413,7 +413,11 @@ pub struct Timeline { /// Keep aux directory cache to avoid it's reconstruction on each update pub(crate) aux_files: tokio::sync::Mutex, + /// Size estiamtor for aux file v2 pub(crate) aux_file_size_estimator: AuxFileSizeEstimator, + + /// Indicate whether aux file v2 storage is enabled. + pub(crate) last_aux_file_policy: AtomicRuntimeAuxFilePolicy, } pub struct WalReceiverInfo { @@ -2011,7 +2015,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { - pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy { + pub(crate) fn get_switch_aux_file_policy(&self) -> SwitchAuxFilePolicy { let tenant_conf = self.tenant_conf.load(); tenant_conf .tenant_conf @@ -2149,6 +2153,7 @@ impl Timeline { resources: TimelineResources, pg_version: u32, state: TimelineState, + aux_file_policy: RuntimeAuxFilePolicy, cancel: CancellationToken, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -2273,6 +2278,8 @@ impl Timeline { }), aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics), + + last_aux_file_policy: AtomicRuntimeAuxFilePolicy::new(aux_file_policy), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -2422,6 +2429,11 @@ impl Timeline { let shard = self.get_shard_index(); let this = self.myself.upgrade().expect("&self method holds the arc"); + if let Some(ref index_part) = index_part { + self.last_aux_file_policy + .store(index_part.last_aux_file_policy()); + } + let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({ move || { let _g = span.entered(); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d8701be170132..471a1e50310b0 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -4,7 +4,10 @@ use std::{ }; use anyhow::Context; -use pageserver_api::{models::TimelineState, shard::TenantShardId}; +use pageserver_api::{ + models::{RuntimeAuxFilePolicy, TimelineState}, + shard::TenantShardId, +}; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId}; @@ -278,6 +281,7 @@ impl DeleteTimelineFlow { // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, + RuntimeAuxFilePolicy::Unspecified, // Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace ) .context("create_timeline_struct")?; diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index a2f761fa949a3..f4f9c286f1299 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -8,6 +8,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use chrono::NaiveDateTime; +use pageserver_api::models::RuntimeAuxFilePolicy; use std::sync::Arc; use tracing::info; use utils::lsn::AtomicLsn; @@ -60,6 +61,8 @@ pub(crate) struct UploadQueueInitialized { /// Part of the flattened "next" `index_part.json`. pub(crate) latest_lineage: Lineage, + pub(crate) last_aux_file_policy: RuntimeAuxFilePolicy, + /// `disk_consistent_lsn` from the last metadata file that was successfully /// uploaded. `Lsn(0)` if nothing was uploaded yet. /// Unlike `latest_files` or `latest_metadata`, this value is never ahead. @@ -189,6 +192,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: RuntimeAuxFilePolicy::Unspecified, }; *self = UploadQueue::Initialized(state); @@ -239,6 +243,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_aux_file_policy: index_part.last_aux_file_policy(), }; *self = UploadQueue::Initialized(state); diff --git a/test_runner/regress/test_aux_files.py b/test_runner/regress/test_aux_files.py new file mode 100644 index 0000000000000..139d49c9f23d1 --- /dev/null +++ b/test_runner/regress/test_aux_files.py @@ -0,0 +1,81 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + logical_replication_sync, +) + + +def test_aux_v2_config_switch(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_aux_v2_config_switch", "empty") + endpoint = env.endpoints.create_start( + "test_aux_v2_config_switch", config_lines=["log_statement=all"] + ) + + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V2" + client.set_tenant_config(tenant_id, tenant_config) + # aux file v2 is enabled on the write path + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "Unspecified" + ) + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" + ) + cur.execute("create publication pub1 for table t, replication_example") + + # now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils) + # instead of going through the full logical replication process. + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" + ) + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + vanilla_pg.stop() + endpoint.stop() + + with env.pageserver.http_client() as client: + # aux file v2 flag should be enabled at this point + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(tenant_id).effective_config + tenant_config["switch_aux_file_policy"] = "V1" + client.set_tenant_config(tenant_id, tenant_config) + # the flag should still be enabled + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + env.pageserver.restart() + with env.pageserver.http_client() as client: + # aux file v2 flag should be persisted + assert ( + client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ + "last_aux_file_policy" + ] + == "V2" + ) + # TODO(chi): test with timeline detach?