Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: only store SLRUs & aux files on shard zero #9786

Merged
merged 10 commits into from
Dec 3, 2024
5 changes: 5 additions & 0 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,11 @@ impl Key {
&& self.field6 == 1
}

#[inline(always)]
pub fn is_aux_file_key(&self) -> bool {
self.field1 == AUX_KEY_PREFIX
}

/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
Expand Down
34 changes: 26 additions & 8 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,37 @@ impl ShardIdentity {
}
}

/// Return true if the key should be stored on all shards, not just one.
fn is_key_global(&self, key: &Key) -> bool {
jcsp marked this conversation as resolved.
Show resolved Hide resolved
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
// Special keys that are only stored on shard 0
false
} else if key.is_rel_block_key() {
// Ordinary relation blocks are distributed across shards
false
} else if key.is_rel_size_key() {
// All shards maintain rel size keys (although only shard 0 is responsible for
// keeping it strictly accurate, other shards just reflect the highest block they've ingested)
true
} else {
// For everything else, we assume it must be kept everywhere, because ingest code
// might assume this -- this covers functionality where the ingest code has
// not (yet) been made fully shard aware.
true
}
}

/// Return true if the key should be discarded if found in this shard's
/// data store, e.g. during compaction after a split.
///
/// Shards _may_ drop keys which return false here, but are not obliged to.
pub fn is_key_disposable(&self, key: &Key) -> bool {
jcsp marked this conversation as resolved.
Show resolved Hide resolved
if key_is_shard0(key) {
// Q: Why can't we dispose of shard0 content if we're not shard 0?
// A1: because the WAL ingestion logic currently ingests some shard 0
// content on all shards, even though it's only read on shard 0. If we
// dropped it, then subsequent WAL ingest to these keys would encounter
// an error.
// A2: because key_is_shard0 also covers relation size keys, which are written
// on all shards even though they're only maintained accurately on shard 0.
if self.count < ShardCount(2) {
// Fast path: unsharded tenant doesn't dispose of anything
return false;
}

if self.is_key_global(key) {
false
} else {
!self.is_key_local(key)
Expand Down
22 changes: 13 additions & 9 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl MetadataRecord {
Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
}
pg_constants::RM_LOGICALMSG_ID => {
Self::decode_logical_message_record(&mut buf, decoded)?
Self::decode_logical_message_record(&mut buf, shard, decoded)?
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}
pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
Expand Down Expand Up @@ -866,6 +866,7 @@ impl MetadataRecord {

fn decode_logical_message_record(
buf: &mut Bytes,
shard: &ShardIdentity,
decoded: &DecodedWALRecord,
) -> anyhow::Result<Option<MetadataRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
Expand All @@ -881,14 +882,17 @@ impl MetadataRecord {
}

if let Some(path) = prefix.strip_prefix("neon-file:") {
let buf_size = xlrec.prefix_size + xlrec.message_size;
let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
return Ok(Some(MetadataRecord::LogicalMessage(
LogicalMessageRecord::Put(PutLogicalMessage {
path: path.to_string(),
buf,
}),
)));
// Only shard 0 stores AUX files
if shard.is_shard_zero() {
let buf_size = xlrec.prefix_size + xlrec.message_size;
let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
return Ok(Some(MetadataRecord::LogicalMessage(
LogicalMessageRecord::Put(PutLogicalMessage {
path: path.to_string(),
buf,
}),
)));
}
}
}

Expand Down
56 changes: 36 additions & 20 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.await?;
Expand All @@ -548,6 +549,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await
}
Expand All @@ -560,6 +562,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
Ok(buf.get_u32_le())
Expand All @@ -573,6 +576,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
Expand Down Expand Up @@ -1043,26 +1047,28 @@ impl Timeline {
}

// Iterate SLRUs next
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
if self.tenant_shard_id.is_shard_zero() {
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
}
}

Expand Down Expand Up @@ -1464,6 +1470,10 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
jcsp marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}

self.put(
slru_block_to_key(kind, segno, blknum),
Value::WalRecord(rec),
Expand Down Expand Up @@ -1497,6 +1507,9 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down Expand Up @@ -1538,6 +1551,9 @@ impl<'a> DatadirModification<'a> {
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,10 @@ impl WalIngest {
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
if !self.shard.is_shard_zero() {
return Ok(());
}

self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?;
modification.put_slru_page_image(kind, segno, blknum, img)?;
Expand Down
Loading