From 4e22e2615c528e5751d22c4ebe1d353797523c6a Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 20 Nov 2024 17:08:09 +0000 Subject: [PATCH] wal_decoder: filter aux files from shards >0 --- libs/pageserver_api/src/key.rs | 5 +++++ libs/pageserver_api/src/shard.rs | 4 ++-- libs/wal_decoder/src/decoder.rs | 22 +++++++++++++--------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 523d1433818b..37dff6fe4647 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -770,6 +770,11 @@ impl Key { && self.field6 == 1 } + #[inline(always)] + pub fn is_aux_file_key(&self) -> bool { + self.field1 == AUX_KEY_PREFIX + } + /// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`. #[inline(always)] pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> { diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index edeecdc6c110..a5c94a82c162 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -172,8 +172,8 @@ impl ShardIdentity { /// Return true if the key should be stored on all shards, not just one. fn is_key_global(&self, key: &Key) -> bool { - if key.is_slru_block_key() || key.is_slru_segment_size_key() { - // SLRU blocks are only stored on shard 0 + if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() { + // Special keys that are only stored on shard 0 false } else if key.is_rel_block_key() { // Ordinary relation blocks are distributed across shards diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 36c4b19266aa..cae593e732c1 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -100,7 +100,7 @@ impl MetadataRecord { Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)? } pg_constants::RM_LOGICALMSG_ID => { - Self::decode_logical_message_record(&mut buf, decoded)? + Self::decode_logical_message_record(&mut buf, shard, decoded)? } pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?, pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?, @@ -866,6 +866,7 @@ impl MetadataRecord { fn decode_logical_message_record( buf: &mut Bytes, + shard: &ShardIdentity, decoded: &DecodedWALRecord, ) -> anyhow::Result> { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -881,14 +882,17 @@ impl MetadataRecord { } if let Some(path) = prefix.strip_prefix("neon-file:") { - let buf_size = xlrec.prefix_size + xlrec.message_size; - let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); - return Ok(Some(MetadataRecord::LogicalMessage( - LogicalMessageRecord::Put(PutLogicalMessage { - path: path.to_string(), - buf, - }), - ))); + // Only shard 0 stores AUX files + if shard.is_shard_zero() { + let buf_size = xlrec.prefix_size + xlrec.message_size; + let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); + return Ok(Some(MetadataRecord::LogicalMessage( + LogicalMessageRecord::Put(PutLogicalMessage { + path: path.to_string(), + buf, + }), + ))); + } } }