Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): persist aux file policy in index part #7668

Merged
merged 21 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -308,13 +309,85 @@ pub struct TenantConfig {
pub switch_aux_file_policy: Option<AuxFilePolicy>,
}

/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
///
/// Unset -> V1
/// -> V2
/// -> CrossValidation -> V2
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuxFilePolicy {
/// V1 aux file policy: store everything in AUX_FILE_KEY
V1,
/// V2 aux file policy: store in the AUX_FILE keyspace
V2,
/// Cross validation runs both formats on the write path and does validation
/// on the read path.
CrossValidation,
}

impl AuxFilePolicy {
pub fn is_valid_migration_path(from: Option<Self>, to: Self) -> bool {
matches!(
(from, to),
(None, _) | (Some(AuxFilePolicy::CrossValidation), AuxFilePolicy::V2)
)
}
}

/// The aux file policy memory flag. Users can store `Option<AuxFilePolicy>` into this atomic flag. 0 == unspecified.
pub struct AtomicAuxFilePolicy(AtomicUsize);

impl AtomicAuxFilePolicy {
pub fn new(policy: Option<AuxFilePolicy>) -> Self {
Self(AtomicUsize::new(
policy.map(AuxFilePolicy::to_usize).unwrap_or_default(),
))
}

pub fn load(&self) -> Option<AuxFilePolicy> {
// Using SeqCst memory order because this flag will be read from both write/read threads
// and we want to guarantee the strongest consistency.
skyzh marked this conversation as resolved.
Show resolved Hide resolved
match self.0.load(std::sync::atomic::Ordering::SeqCst) {
0 => None,
other => Some(AuxFilePolicy::from_usize(other)),
}
}

pub fn store(&self, policy: Option<AuxFilePolicy>) {
self.0.store(
policy.map(AuxFilePolicy::to_usize).unwrap_or_default(),
std::sync::atomic::Ordering::SeqCst,
);
}
}

impl AuxFilePolicy {
pub fn to_usize(self) -> usize {
match self {
Self::V1 => 1,
Self::CrossValidation => 2,
Self::V2 => 3,
}
}

pub fn try_from_usize(this: usize) -> Option<Self> {
match this {
1 => Some(Self::V1),
2 => Some(Self::CrossValidation),
3 => Some(Self::V2),
_ => None,
}
}

pub fn from_usize(this: usize) -> Self {
Self::try_from_usize(this).unwrap()
}
}

impl FromStr for AuxFilePolicy {
type Err = anyhow::Error;

Expand Down Expand Up @@ -604,6 +677,9 @@ pub struct TimelineInfo {
pub state: TimelineState,

pub walreceiver_status: String,

/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -1456,4 +1532,59 @@ mod tests {
assert_eq!(actual, expected, "example on {line}");
}
}

#[test]
fn test_aux_file_migration_path() {
koivunej marked this conversation as resolved.
Show resolved Hide resolved
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::V1
));
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::V2
));
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::CrossValidation
));
// Self-migration is not a valid migration path, and the caller should handle it by itself.
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::V2
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::CrossValidation
));
// Migrations not allowed
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::V2
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::CrossValidation
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::CrossValidation
));
// Migrations allowed
assert!(AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::V2
));
}
}
1 change: 1 addition & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ fn handle_metadata(
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
skyzh marked this conversation as resolved.
Show resolved Hide resolved
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
// TODO: simplify this part
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ async fn build_timeline_info_common(
state,

walreceiver_status,

last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
Expand Down
33 changes: 27 additions & 6 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
Expand Down Expand Up @@ -718,10 +718,11 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get_switch_aux_file_policy() {
AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
AuxFilePolicy::CrossValidation => {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
Some(AuxFilePolicy::CrossValidation) => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
Expand Down Expand Up @@ -1469,7 +1470,27 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let policy = self.tline.get_switch_aux_file_policy();
let switch_policy = self.tline.get_switch_aux_file_policy();

let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.last_aux_file_policy.store(Some(switch_policy));
info!("switching to aux file policy {:?}", switch_policy);
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
switch_policy
} else {
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
current_policy.unwrap_or(AuxFilePolicy::V1)
}
};

if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
Expand Down
47 changes: 47 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
Expand Down Expand Up @@ -528,6 +529,7 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
Expand All @@ -538,6 +540,10 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
last_aux_file_policy,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
Expand All @@ -552,6 +558,10 @@ impl Tenant {

if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;

timeline
.last_aux_file_policy
.store(index_part.last_aux_file_policy());
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
Expand Down Expand Up @@ -1172,12 +1182,15 @@ impl Tenant {
None
};

let last_aux_file_policy = index_part.last_aux_file_policy();

self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
remote_metadata,
ancestor,
last_aux_file_policy,
ctx,
)
.await
Expand Down Expand Up @@ -1357,6 +1370,7 @@ impl Tenant {
create_guard,
initdb_lsn,
None,
None,
)
.await
}
Expand Down Expand Up @@ -2415,6 +2429,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
Expand Down Expand Up @@ -2443,6 +2458,7 @@ impl Tenant {
resources,
pg_version,
state,
last_aux_file_policy,
self.cancel.child_token(),
);

Expand Down Expand Up @@ -3093,6 +3109,7 @@ impl Tenant {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
src_timeline.last_aux_file_policy.load(),
)
.await?;

Expand Down Expand Up @@ -3286,6 +3303,7 @@ impl Tenant {
timeline_create_guard,
pgdata_lsn,
None,
None,
)
.await?;

Expand Down Expand Up @@ -3357,6 +3375,7 @@ impl Tenant {
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<UninitializedTimeline> {
let tenant_shard_id = self.tenant_shard_id;

Expand All @@ -3372,6 +3391,7 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
last_aux_file_policy,
)
.context("Failed to create timeline data structure")?;

Expand Down Expand Up @@ -5595,4 +5615,31 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_aux_flag_branch() -> anyhow::Result<()> {
skyzh marked this conversation as resolved.
Show resolved Hide resolved
let mut harness = TenantHarness::create("aux_flag_branch")?;
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2;
let (tenant, ctx) = harness.load().await;
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
assert_eq!(tline.last_aux_file_policy.load(), None);
{
let mut modification = tline.begin_modification(Lsn(0x10));
modification
.put_file("pg_logical/mappings/test", b"test", &ctx)
.await?;
modification.commit(&ctx).await?;
}
assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let create_guard = tenant
.create_timeline_create_guard(NEW_TIMELINE_ID)
.unwrap();
let tline2 = tenant
.branch_timeline_impl(&tline, NEW_TIMELINE_ID, None, create_guard, &ctx)
.await?;
skyzh marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(tline2.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
Ok(())
skyzh marked this conversation as resolved.
Show resolved Hide resolved
}
}
2 changes: 2 additions & 0 deletions pageserver/src/tenant/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ pub struct TenantConf {

/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
}

Expand Down
Loading
Loading