Skip to content

Commit

Permalink
pageserver: store aux files as deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Feb 13, 2024
1 parent a8eb404 commit 8fb23fc
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 22 deletions.
97 changes: 76 additions & 21 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_aux_files: false,
lsn,
}
}
Expand Down Expand Up @@ -868,6 +869,11 @@ pub struct DatadirModification<'a> {
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,

// Whether we already wrote any aux file changes in this modification. If true,
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
// if AUX_FILES_KEY is already set.
pending_aux_files: bool,
}

impl<'a> DatadirModification<'a> {
Expand Down Expand Up @@ -1350,28 +1356,67 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
Ok(buf) => AuxFilesDirectory::des(&buf)?,
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
AuxFilesDirectory {
files: HashMap::new(),
}
}
let key = path.to_string();
let value = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let path = path.to_string();
if content.is_empty() {
dir.files.remove(&path);

if self.pending_aux_files {
// We already updated aux files in `self`: emit a delta and update our latest value

self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
key: key.clone(),
value: value.clone(),
}),
);
} else {
dir.files.insert(path, Bytes::copy_from_slice(content));
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(_) => {
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
key: key.clone(),
value: value.clone(),
}),
);
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.

let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(key, value);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
}
};
}
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);

self.pending_aux_files = true;

Ok(())
}

Expand Down Expand Up @@ -1573,8 +1618,18 @@ struct RelDirectory {
}

#[derive(Debug, Serialize, Deserialize, Default)]
struct AuxFilesDirectory {
files: HashMap<String, Bytes>,
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}

impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/walrecord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum NeonWalRecord {
moff: MultiXactOffset,
members: Vec<MultiXactMember>,
},
/// Update the map of AUX files, either writing or dropping a key
AuxFile { key: String, value: Option<Bytes> },
}

impl NeonWalRecord {
Expand Down
12 changes: 11 additions & 1 deletion pageserver/src/walredo/apply_neon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
Expand All @@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;

/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?
Expand Down Expand Up @@ -230,6 +232,14 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { key, value } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(key.clone(), value.clone());

page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
}
Ok(())
}

0 comments on commit 8fb23fc

Please sign in to comment.