Skip to content

Commit

Permalink
pageserver: only store SLRUs & aux files on shard zero (#9786)
Browse files Browse the repository at this point in the history
## Problem

Since #9423 the non-zero shards
no longer need SLRU content in order to do GC. This data is now
redundant on shards >0.

One release cycle after merging that PR, we may merge this one, which
also stops writing those pages to shards > 0, reaping the efficiency
benefit.

Closes: #7512
Closes: #9641

## Summary of changes

- Avoid storing SLRUs on non-zero shards
- Bonus: avoid storing aux files on non-zero shards
  • Loading branch information
jcsp authored Dec 3, 2024
1 parent 71d0042 commit dcb6295
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 86 deletions.
5 changes: 5 additions & 0 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,11 @@ impl Key {
&& self.field6 == 1
}

#[inline(always)]
pub fn is_aux_file_key(&self) -> bool {
self.field1 == AUX_KEY_PREFIX
}

/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
Expand Down
34 changes: 26 additions & 8 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,37 @@ impl ShardIdentity {
}
}

/// Return true if the key should be stored on all shards, not just one.
fn is_key_global(&self, key: &Key) -> bool {
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
// Special keys that are only stored on shard 0
false
} else if key.is_rel_block_key() {
// Ordinary relation blocks are distributed across shards
false
} else if key.is_rel_size_key() {
// All shards maintain rel size keys (although only shard 0 is responsible for
// keeping it strictly accurate, other shards just reflect the highest block they've ingested)
true
} else {
// For everything else, we assume it must be kept everywhere, because ingest code
// might assume this -- this covers functionality where the ingest code has
// not (yet) been made fully shard aware.
true
}
}

/// Return true if the key should be discarded if found in this shard's
/// data store, e.g. during compaction after a split.
///
/// Shards _may_ drop keys which return false here, but are not obliged to.
pub fn is_key_disposable(&self, key: &Key) -> bool {
if key_is_shard0(key) {
// Q: Why can't we dispose of shard0 content if we're not shard 0?
// A1: because the WAL ingestion logic currently ingests some shard 0
// content on all shards, even though it's only read on shard 0. If we
// dropped it, then subsequent WAL ingest to these keys would encounter
// an error.
// A2: because key_is_shard0 also covers relation size keys, which are written
// on all shards even though they're only maintained accurately on shard 0.
if self.count < ShardCount(2) {
// Fast path: unsharded tenant doesn't dispose of anything
return false;
}

if self.is_key_global(key) {
false
} else {
!self.is_key_local(key)
Expand Down
54 changes: 31 additions & 23 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,38 @@ impl MetadataRecord {
};

// Next, filter the metadata record by shard.

// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
if let Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) = metadata_record
{
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() {
metadata_record = None
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
}
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
}
}
_ => {}
}

Ok(metadata_record)
Expand Down
18 changes: 12 additions & 6 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,18 +575,24 @@ async fn import_file(
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;

import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported clog slru");
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported clog slru");
}
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;

import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact offsets slru");
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact offsets slru");
}
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;

import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact members slru");
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact members slru");
}
} else if file_path.starts_with("pg_twophase") {
let bytes = read_all_bytes(reader).await?;

Expand Down
57 changes: 37 additions & 20 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.await?;
Expand All @@ -552,6 +553,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await
}
Expand All @@ -564,6 +566,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
Ok(buf.get_u32_le())
Expand All @@ -577,6 +580,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
Expand Down Expand Up @@ -1047,26 +1051,28 @@ impl Timeline {
}

// Iterate SLRUs next
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
if self.tenant_shard_id.is_shard_zero() {
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
}
}

Expand Down Expand Up @@ -1468,6 +1474,10 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}

self.put(
slru_block_to_key(kind, segno, blknum),
Value::WalRecord(rec),
Expand Down Expand Up @@ -1501,6 +1511,8 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());

let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down Expand Up @@ -1542,6 +1554,7 @@ impl<'a> DatadirModification<'a> {
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down Expand Up @@ -1853,6 +1866,8 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());

// Add it to the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key, ctx).await?;
Expand Down Expand Up @@ -1885,6 +1900,8 @@ impl<'a> DatadirModification<'a> {
segno: u32,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());

// Put size
let size_key = slru_segment_size_to_key(kind, segno);
let buf = nblocks.to_le_bytes();
Expand Down
49 changes: 20 additions & 29 deletions pageserver/src/tenant/timeline/import_pgdata/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,23 @@ impl Flow {
}

// Import SLRUs

// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
if self.timeline.tenant_shard_id.is_shard_zero() {
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(
SlruKind::MultiXactMembers,
&self.storage.pgdata().join("pg_multixact/members"),
)
.await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(
SlruKind::MultiXactMembers,
&self.storage.pgdata().join("pg_multixact/members"),
)
.await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(
SlruKind::MultiXactOffsets,
&self.storage.pgdata().join("pg_multixact/offsets"),
)
.await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(
SlruKind::MultiXactOffsets,
&self.storage.pgdata().join("pg_multixact/offsets"),
)
.await?;
}

// Import pg_twophase.
// TODO: as empty
Expand Down Expand Up @@ -302,6 +303,8 @@ impl Flow {
}

async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
assert!(self.timeline.tenant_shard_id.is_shard_zero());

let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
.into_iter()
Expand Down Expand Up @@ -337,7 +340,6 @@ impl Flow {
debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
self.tasks
.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
*self.timeline.get_shard_identity(),
start_key..end_key,
&p,
self.storage.clone(),
Expand Down Expand Up @@ -631,21 +633,14 @@ impl ImportTask for ImportRelBlocksTask {
}

struct ImportSlruBlocksTask {
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: RemotePath,
storage: RemoteStorageWrapper,
}

impl ImportSlruBlocksTask {
fn new(
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: &RemotePath,
storage: RemoteStorageWrapper,
) -> Self {
fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
ImportSlruBlocksTask {
shard_identity,
key_range,
path: path.clone(),
storage,
Expand Down Expand Up @@ -673,17 +668,13 @@ impl ImportTask for ImportSlruBlocksTask {
let mut file_offset = 0;
while blknum < end_blk {
let key = slru_block_to_key(kind, segno, blknum);
assert!(
!self.shard_identity.is_key_disposable(&key),
"SLRU keys need to go into every shard"
);
let buf = &buf[file_offset..(file_offset + 8192)];
file_offset += 8192;
layer_writer
.put_image(key, Bytes::copy_from_slice(buf), ctx)
.await?;
blknum += 1;
nimages += 1;
blknum += 1;
}
Ok(nimages)
}
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,10 @@ impl WalIngest {
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
if !self.shard.is_shard_zero() {
return Ok(());
}

self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?;
modification.put_slru_page_image(kind, segno, blknum, img)?;
Expand Down

0 comments on commit dcb6295

Please sign in to comment.