Skip to content

Commit

Permalink
feat(pageserver): persist aux file policy in timeline metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed May 13, 2024
1 parent 396b9cf commit abbdfda
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 29 deletions.
6 changes: 3 additions & 3 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo,
self, LocationConfig, ShardParameters, SwitchAuxFilePolicy, TenantHistorySize, TenantInfo,
TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
Expand Down Expand Up @@ -380,7 +380,7 @@ impl PageServerNode {
.context("parse `timeline_get_throttle` from json")?,
switch_aux_file_policy: settings
.remove("switch_aux_file_policy")
.map(|x| x.parse::<AuxFilePolicy>())
.map(|x| x.parse::<SwitchAuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
};
Expand Down Expand Up @@ -503,7 +503,7 @@ impl PageServerNode {
.context("parse `timeline_get_throttle` from json")?,
switch_aux_file_policy: settings
.remove("switch_aux_file_policy")
.map(|x| x.parse::<AuxFilePolicy>())
.map(|x| x.parse::<SwitchAuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
}
Expand Down
100 changes: 97 additions & 3 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -305,17 +306,107 @@ pub struct TenantConfig {
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
pub switch_aux_file_policy: Option<SwitchAuxFilePolicy>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuxFilePolicy {
pub enum SwitchAuxFilePolicy {
V1,
V2,
CrossValidation,
}

impl FromStr for AuxFilePolicy {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RuntimeAuxFilePolicy {
V1,
V2,
CrossValidation,
Unspecified,
}

pub struct AtomicRuntimeAuxFilePolicy(AtomicUsize);

impl AtomicRuntimeAuxFilePolicy {
pub fn new(policy: RuntimeAuxFilePolicy) -> Self {
Self(AtomicUsize::new(policy.to_usize()))
}

pub fn load(&self) -> RuntimeAuxFilePolicy {
RuntimeAuxFilePolicy::from_usize(self.0.load(std::sync::atomic::Ordering::SeqCst))
}

pub fn store(&self, policy: RuntimeAuxFilePolicy) {
self.0.store(
RuntimeAuxFilePolicy::to_usize(policy),
std::sync::atomic::Ordering::SeqCst,
);
}
}

impl SwitchAuxFilePolicy {
pub fn to_usize(self) -> usize {
match self {
Self::V1 => 1,
Self::CrossValidation => 2,
Self::V2 => 3,
}
}

pub fn from_usize(this: usize) -> Self {
match this {
1 => Self::V1,
2 => Self::CrossValidation,
3 => Self::V2,
_ => unreachable!(),
}
}

pub fn to_runtime_policy(&self) -> RuntimeAuxFilePolicy {
RuntimeAuxFilePolicy::from_usize(self.to_usize())
}
}

impl RuntimeAuxFilePolicy {
pub fn to_usize(self) -> usize {
match self {
Self::V1 => 1,
Self::CrossValidation => 2,
Self::V2 => 3,
Self::Unspecified => 0,
}
}

pub fn from_usize(this: usize) -> Self {
match this {
1 => Self::V1,
2 => Self::CrossValidation,
3 => Self::V2,
0 => Self::Unspecified,
_ => unreachable!(),
}
}
}

impl FromStr for RuntimeAuxFilePolicy {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.to_lowercase();
if s == "" || s == "unspecified" {
return Ok(Self::Unspecified);
} else if s == "v1" {
Ok(Self::V1)
} else if s == "v2" {
Ok(Self::V2)
} else if s == "crossvalidation" || s == "cross_validation" {
Ok(Self::CrossValidation)
} else {
anyhow::bail!("cannot parse {} to aux file policy", s)
}
}
}

impl FromStr for SwitchAuxFilePolicy {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand Down Expand Up @@ -604,6 +695,9 @@ pub struct TimelineInfo {
pub state: TimelineState,

pub walreceiver_status: String,

/// Whether aux file v2 is enabled
pub last_aux_file_policy: RuntimeAuxFilePolicy,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ fn handle_metadata(
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
// TODO: simplify this part
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ async fn build_timeline_info_common(
state,

walreceiver_status,

last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
Expand Down
49 changes: 39 additions & 10 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::{RuntimeAuxFilePolicy, SwitchAuxFilePolicy};
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
Expand All @@ -35,7 +35,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
Expand Down Expand Up @@ -714,10 +714,13 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get_switch_aux_file_policy() {
AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
AuxFilePolicy::CrossValidation => {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::Unspecified => {
self.list_aux_files_v1(lsn, ctx).await
}
RuntimeAuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
RuntimeAuxFilePolicy::CrossValidation => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
Expand Down Expand Up @@ -1447,7 +1450,7 @@ impl<'a> DatadirModification<'a> {
}

pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
if let SwitchAuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
Expand All @@ -1465,8 +1468,34 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let policy = self.tline.get_switch_aux_file_policy();
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let switch_policy = self.tline.get_switch_aux_file_policy();

let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
match (current_policy, switch_policy) {
(RuntimeAuxFilePolicy::Unspecified, _)
| (RuntimeAuxFilePolicy::CrossValidation, SwitchAuxFilePolicy::V2) => {
let new_policy = switch_policy.to_runtime_policy();
self.tline.last_aux_file_policy.store(new_policy);
info!("switching to aux file policy {:?}", new_policy);
if let Some(ref remote_client) = self.tline.remote_client {
remote_client
.schedule_index_upload_for_aux_file_policy_update(new_policy)?;
remote_client.wait_completion().await?;
}
info!("finish switching to aux file policy {:?}", new_policy);
new_policy
}
(current_policy, _) => current_policy,
}
};

assert!(policy != RuntimeAuxFilePolicy::Unspecified);

if let RuntimeAuxFilePolicy::V2 | RuntimeAuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Expand Down Expand Up @@ -1495,7 +1524,7 @@ impl<'a> DatadirModification<'a> {
self.put(key, Value::Image(new_val.into()));
}

if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
if let RuntimeAuxFilePolicy::V1 | RuntimeAuxFilePolicy::CrossValidation = policy {
let file_path = path.to_string();
let content = if content.is_empty() {
None
Expand Down
16 changes: 16 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::RuntimeAuxFilePolicy;
use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
Expand Down Expand Up @@ -528,6 +529,7 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: RuntimeAuxFilePolicy,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
Expand All @@ -538,6 +540,10 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
last_aux_file_policy,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
Expand Down Expand Up @@ -1176,12 +1182,15 @@ impl Tenant {
None
};

let last_aux_file_policy = index_part.last_aux_file_policy();

self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
remote_metadata,
ancestor,
last_aux_file_policy,
ctx,
)
.await
Expand Down Expand Up @@ -1360,6 +1369,7 @@ impl Tenant {
create_guard,
initdb_lsn,
None,
RuntimeAuxFilePolicy::Unspecified,
)
.await
}
Expand Down Expand Up @@ -2431,6 +2441,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
last_aux_file_policy: RuntimeAuxFilePolicy,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
Expand Down Expand Up @@ -2459,6 +2470,7 @@ impl Tenant {
resources,
pg_version,
state,
last_aux_file_policy,
self.cancel.child_token(),
);

Expand Down Expand Up @@ -3109,6 +3121,7 @@ impl Tenant {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
src_timeline.last_aux_file_policy.load(),
)
.await?;

Expand Down Expand Up @@ -3311,6 +3324,7 @@ impl Tenant {
timeline_create_guard,
pgdata_lsn,
None,
RuntimeAuxFilePolicy::Unspecified,
)
.await?;

Expand Down Expand Up @@ -3388,6 +3402,7 @@ impl Tenant {
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: RuntimeAuxFilePolicy,
) -> anyhow::Result<UninitializedTimeline> {
let tenant_shard_id = self.tenant_shard_id;

Expand All @@ -3403,6 +3418,7 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
last_aux_file_policy,
)
.context("Failed to create timeline data structure")?;

Expand Down
Loading

0 comments on commit abbdfda

Please sign in to comment.