From a3b29b1794e8db96427efac82beded2de4ae87a5 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 13 Dec 2024 16:55:06 +0100
Subject: [PATCH 1/9] wal_decoder: derive Clone for MetadataRecord

---
 libs/postgres_ffi/src/walrecord.rs | 14 ++++----
 libs/wal_decoder/src/models.rs     | 52 +++++++++++++++---------------
 2 files changed, 33 insertions(+), 33 deletions(-)

diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs
index b32106632aa4..fce37e2fdd3a 100644
--- a/libs/postgres_ffi/src/walrecord.rs
+++ b/libs/postgres_ffi/src/walrecord.rs
@@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
 use utils::lsn::Lsn;
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlMultiXactCreate {
     pub mid: MultiXactId,
     /* new MultiXact's ID */
@@ -46,7 +46,7 @@ impl XlMultiXactCreate {
 }
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlMultiXactTruncate {
     pub oldest_multi_db: Oid,
     /* to-be-truncated range of multixact offsets */
@@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
 }
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlRelmapUpdate {
     pub dbid: Oid,   /* database ID, or 0 for shared map */
     pub tsid: Oid,   /* database's tablespace, or pg_global */
@@ -90,7 +90,7 @@ impl XlRelmapUpdate {
 }
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlReploriginDrop {
     pub node_id: RepOriginId,
 }
@@ -104,7 +104,7 @@ impl XlReploriginDrop {
 }
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlReploriginSet {
     pub remote_lsn: Lsn,
     pub node_id: RepOriginId,
@@ -911,7 +911,7 @@ impl XlSmgrCreate {
 }
 
 #[repr(C)]
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlSmgrTruncate {
     pub blkno: BlockNumber,
     pub rnode: RelFileNode,
@@ -984,7 +984,7 @@ impl XlDropDatabase {
 /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
 /// struct for commits and aborts.
 ///
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct XlXactParsedRecord {
     pub xid: TransactionId,
     pub info: u8,
diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs
index af22de5d9596..1c281a2ea8b2 100644
--- a/libs/wal_decoder/src/models.rs
+++ b/libs/wal_decoder/src/models.rs
@@ -99,7 +99,7 @@ impl InterpretedWalRecord {
 
 /// The interpreted part of the Postgres WAL record which requires metadata
 /// writes to the underlying storage engine.
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum MetadataRecord {
     Heapam(HeapamRecord),
     Neonrmgr(NeonrmgrRecord),
@@ -115,12 +115,12 @@ pub enum MetadataRecord {
     Replorigin(ReploriginRecord),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum HeapamRecord {
     ClearVmBits(ClearVmBits),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct ClearVmBits {
     pub new_heap_blkno: Option<u32>,
     pub old_heap_blkno: Option<u32>,
@@ -128,29 +128,29 @@ pub struct ClearVmBits {
     pub flags: u8,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum NeonrmgrRecord {
     ClearVmBits(ClearVmBits),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum SmgrRecord {
     Create(SmgrCreate),
     Truncate(XlSmgrTruncate),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct SmgrCreate {
     pub rel: RelTag,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum DbaseRecord {
     Create(DbaseCreate),
     Drop(DbaseDrop),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct DbaseCreate {
     pub db_id: Oid,
     pub tablespace_id: Oid,
@@ -158,32 +158,32 @@ pub struct DbaseCreate {
     pub src_tablespace_id: Oid,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct DbaseDrop {
     pub db_id: Oid,
     pub tablespace_ids: Vec<Oid>,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum ClogRecord {
     ZeroPage(ClogZeroPage),
     Truncate(ClogTruncate),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct ClogZeroPage {
     pub segno: u32,
     pub rpageno: u32,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct ClogTruncate {
     pub pageno: u32,
     pub oldest_xid: TransactionId,
     pub oldest_xid_db: Oid,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum XactRecord {
     Commit(XactCommon),
     Abort(XactCommon),
@@ -192,7 +192,7 @@ pub enum XactRecord {
     Prepare(XactPrepare),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct XactCommon {
     pub parsed: XlXactParsedRecord,
     pub origin_id: u16,
@@ -201,73 +201,73 @@ pub struct XactCommon {
     pub lsn: Lsn,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct XactPrepare {
     pub xl_xid: TransactionId,
     pub data: Bytes,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum MultiXactRecord {
     ZeroPage(MultiXactZeroPage),
     Create(XlMultiXactCreate),
     Truncate(XlMultiXactTruncate),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct MultiXactZeroPage {
     pub slru_kind: SlruKind,
     pub segno: u32,
     pub rpageno: u32,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum RelmapRecord {
     Update(RelmapUpdate),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct RelmapUpdate {
     pub update: XlRelmapUpdate,
     pub buf: Bytes,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum XlogRecord {
     Raw(RawXlogRecord),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct RawXlogRecord {
     pub info: u8,
     pub lsn: Lsn,
     pub buf: Bytes,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum LogicalMessageRecord {
     Put(PutLogicalMessage),
     #[cfg(feature = "testing")]
     Failpoint,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct PutLogicalMessage {
     pub path: String,
     pub buf: Bytes,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum StandbyRecord {
     RunningXacts(StandbyRunningXacts),
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub struct StandbyRunningXacts {
     pub oldest_running_xid: TransactionId,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 pub enum ReploriginRecord {
     Set(XlReploriginSet),
     Drop(XlReploriginDrop),

From cab7a1412fc5273d91ff557949dc6389594f6d2b Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 13 Dec 2024 18:20:05 +0100
Subject: [PATCH 2/9] wal_decoder: prepare interpretation for fan-out

Currently, we call `InterpretedWalRecord::from_bytes_filtered`
from each shard. To serve multiple shards at the same time,
the API needs to allow for enquiring about multiple shards.

This commit tweaks it a pretty brute force way. Naively, we could
just generate the shard for a key, but pre and post split shards
may be subscribed at the same time, so doing it efficiently is more
complex.
---
 libs/pageserver_api/src/shard.rs              |   6 +-
 libs/wal_decoder/src/decoder.rs               | 113 +++++-----
 libs/wal_decoder/src/models.rs                |   2 +-
 libs/wal_decoder/src/serialized_batch.rs      | 200 ++++++++++--------
 pageserver/src/import_datadir.rs              |  23 +-
 .../walreceiver/walreceiver_connection.rs     |  12 +-
 pageserver/src/walingest.rs                   |   8 +-
 safekeeper/src/send_interpreted_wal.rs        |  12 +-
 8 files changed, 218 insertions(+), 158 deletions(-)

diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs
index 4cc0a739e871..21c7e2b36f49 100644
--- a/libs/pageserver_api/src/shard.rs
+++ b/libs/pageserver_api/src/shard.rs
@@ -40,7 +40,7 @@ pub use ::utils::shard::*;
 
 /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
 /// and to check whether that [`ShardNumber`] is the same as the current shard.
-#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
+#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
 pub struct ShardIdentity {
     pub number: ShardNumber,
     pub count: ShardCount,
@@ -49,7 +49,7 @@ pub struct ShardIdentity {
 }
 
 /// Stripe size in number of pages
-#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
+#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
 pub struct ShardStripeSize(pub u32);
 
 impl Default for ShardStripeSize {
@@ -59,7 +59,7 @@ impl Default for ShardStripeSize {
 }
 
 /// Layout version: for future upgrades where we might change how the key->shard mapping works
-#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
+#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
 pub struct ShardLayout(u8);
 
 const LAYOUT_V1: ShardLayout = ShardLayout(1);
diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs
index aa50c629113b..2993f9cce5ee 100644
--- a/libs/wal_decoder/src/decoder.rs
+++ b/libs/wal_decoder/src/decoder.rs
@@ -14,15 +14,15 @@ use utils::lsn::Lsn;
 
 impl InterpretedWalRecord {
     /// Decode and interpreted raw bytes which represent one Postgres WAL record.
-    /// Data blocks which do not match the provided shard identity are filtered out.
+    /// Data blocks which do not match any of the provided shard identities are filtered out.
     /// Shard 0 is a special case since it tracks all relation sizes. We only give it
     /// the keys that are being written as that is enough for updating relation sizes.
     pub fn from_bytes_filtered(
         buf: Bytes,
-        shard: &ShardIdentity,
+        shards: &Vec<ShardIdentity>,
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<InterpretedWalRecord> {
+    ) -> anyhow::Result<Vec<(ShardIdentity, InterpretedWalRecord)>> {
         let mut decoded = DecodedWALRecord::default();
         decode_wal_record(buf, &mut decoded, pg_version)?;
         let xid = decoded.xl_xid;
@@ -33,36 +33,43 @@ impl InterpretedWalRecord {
             FlushUncommittedRecords::No
         };
 
-        let metadata_record =
-            MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
-        let batch = SerializedValueBatch::from_decoded_filtered(
+        let metadata_per_shard =
+            MetadataRecord::from_decoded_filtered(&decoded, shards, next_record_lsn, pg_version)?;
+        let mut batches_per_shard = SerializedValueBatch::from_decoded_filtered(
             decoded,
-            shard,
+            shards,
             next_record_lsn,
             pg_version,
         )?;
 
-        Ok(InterpretedWalRecord {
-            metadata_record,
-            batch,
-            next_record_lsn,
-            flush_uncommitted,
-            xid,
-        })
+        let mut record_per_shard = Vec::with_capacity(shards.len());
+        for (shard_id, metadata_record) in metadata_per_shard {
+            let record = InterpretedWalRecord {
+                metadata_record,
+                batch: batches_per_shard.remove(&shard_id).unwrap(),
+                next_record_lsn,
+                flush_uncommitted,
+                xid,
+            };
+
+            record_per_shard.push((shard_id, record));
+        }
+
+        Ok(record_per_shard)
     }
 }
 
 impl MetadataRecord {
-    /// Builds a metadata record for this WAL record, if any.
+    /// Builds metadata records for this WAL record for the specified shards if any, if any.
     ///
-    /// Only metadata records relevant for the given shard are emitted. Currently, most metadata
+    /// Only metadata records relevant for the given shards is emitted. Currently, most metadata
     /// records are broadcast to all shards for simplicity, but this should be improved.
     fn from_decoded_filtered(
         decoded: &DecodedWALRecord,
-        shard: &ShardIdentity,
+        shards: &Vec<ShardIdentity>,
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<Option<MetadataRecord>> {
+    ) -> anyhow::Result<Vec<(ShardIdentity, Option<MetadataRecord>)>> {
         // Note: this doesn't actually copy the bytes since
         // the [`Bytes`] type implements it via a level of indirection.
         let mut buf = decoded.record.clone();
@@ -112,41 +119,51 @@ impl MetadataRecord {
         };
 
         // Next, filter the metadata record by shard.
-        match metadata_record {
-            Some(
-                MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
-                | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
-            ) => {
-                // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
-                // of the main relation. These are sharded and managed just like regular relation pages.
-                // See: https://github.com/neondatabase/neon/issues/9855
-                let is_local_vm_page = |heap_blk| {
-                    let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
-                    shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
-                };
-                // Send the old and new VM page updates to their respective shards.
-                clear_vm_bits.old_heap_blkno = clear_vm_bits
-                    .old_heap_blkno
-                    .filter(|&blkno| is_local_vm_page(blkno));
-                clear_vm_bits.new_heap_blkno = clear_vm_bits
-                    .new_heap_blkno
-                    .filter(|&blkno| is_local_vm_page(blkno));
-                // If neither VM page belongs to this shard, discard the record.
-                if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
-                {
-                    metadata_record = None
+        let mut metadata_records_per_shard = Vec::with_capacity(shards.len());
+        for shard in shards {
+            let mut metadata_for_shard = metadata_record.clone();
+
+            match metadata_for_shard {
+                Some(
+                    MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
+                    | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
+                ) => {
+                    // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
+                    // of the main relation. These are sharded and managed just like regular relation pages.
+                    // See: https://github.com/neondatabase/neon/issues/9855
+                    let is_local_vm_page = |heap_blk| {
+                        let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
+                        shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
+                    };
+                    // Send the old and new VM page updates to their respective shards.
+                    clear_vm_bits.old_heap_blkno = clear_vm_bits
+                        .old_heap_blkno
+                        .filter(|&blkno| is_local_vm_page(blkno));
+                    clear_vm_bits.new_heap_blkno = clear_vm_bits
+                        .new_heap_blkno
+                        .filter(|&blkno| is_local_vm_page(blkno));
+                    // If neither VM page belongs to this shard, discard the record.
+                    if clear_vm_bits.old_heap_blkno.is_none()
+                        && clear_vm_bits.new_heap_blkno.is_none()
+                    {
+                        metadata_record = None
+                    }
                 }
-            }
-            Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
-                // Filter LogicalMessage records (AUX files) to only be stored on shard zero
-                if !shard.is_shard_zero() {
-                    metadata_record = None;
+                Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
+                    // Filter LogicalMessage records (AUX files) to only be stored on shard zero
+                    if !shard.is_shard_zero() {
+                        metadata_record = None;
+                    }
                 }
+                _ => {}
             }
-            _ => {}
+
+            metadata_records_per_shard.push((*shard, metadata_for_shard));
         }
 
-        Ok(metadata_record)
+        assert_eq!(metadata_records_per_shard.len(), shards.len());
+
+        Ok(metadata_records_per_shard)
     }
 
     fn decode_heapam_record(
diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs
index 1c281a2ea8b2..a30fdc1ee445 100644
--- a/libs/wal_decoder/src/models.rs
+++ b/libs/wal_decoder/src/models.rs
@@ -48,7 +48,7 @@ pub mod proto {
     tonic::include_proto!("interpreted_wal");
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Copy, Clone, Serialize, Deserialize)]
 pub enum FlushUncommittedRecords {
     Yes,
     No,
diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs
index 41294da7a04d..df2e79e0429a 100644
--- a/libs/wal_decoder/src/serialized_batch.rs
+++ b/libs/wal_decoder/src/serialized_batch.rs
@@ -5,7 +5,7 @@
 //! Such batches are created from decoded PG wal records and ingested
 //! by the pageserver by writing directly to the ephemeral file.
 
-use std::collections::BTreeSet;
+use std::collections::{BTreeSet, HashMap};
 
 use bytes::{Bytes, BytesMut};
 use pageserver_api::key::rel_block_to_key;
@@ -136,21 +136,28 @@ impl SerializedValueBatch {
     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     pub(crate) fn from_decoded_filtered(
         decoded: DecodedWALRecord,
-        shard: &ShardIdentity,
+        shards: &Vec<ShardIdentity>,
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<SerializedValueBatch> {
-        // First determine how big the buffer needs to be and allocate it up-front.
+    ) -> anyhow::Result<HashMap<ShardIdentity, SerializedValueBatch>> {
+        // First determine how big the buffers need to be and allocate it up-front.
         // This duplicates some of the work below, but it's empirically much faster.
-        let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
-        let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
+        let mut shard_batches = HashMap::with_capacity(shards.len());
+        for shard in shards {
+            let buf =
+                Vec::<u8>::with_capacity(Self::estimate_buffer_size(&decoded, shard, pg_version));
+            shard_batches.insert(
+                *shard,
+                SerializedValueBatch {
+                    raw: buf,
+                    metadata: Default::default(),
+                    max_lsn: Lsn(0),
+                    len: 0,
+                },
+            );
+        }
 
-        let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
-        let mut max_lsn: Lsn = Lsn(0);
-        let mut len: usize = 0;
         for blk in decoded.blocks.iter() {
-            let relative_off = buf.len() as u64;
-
             let rel = RelTag {
                 spcnode: blk.rnode_spcnode,
                 dbnode: blk.rnode_dbnode,
@@ -168,99 +175,108 @@ impl SerializedValueBatch {
                 );
             }
 
-            let key_is_local = shard.is_key_local(&key);
+            for shard in shards {
+                let mut batch = shard_batches.get_mut(shard).expect("inserted in prologue");
+                let SerializedValueBatch {
+                    raw,
+                    metadata,
+                    max_lsn,
+                    len,
+                } = &mut batch;
+
+                let key_is_local = shard.is_key_local(&key);
+
+                tracing::debug!(
+                    lsn=%next_record_lsn,
+                    key=%key,
+                    "ingest: shard decision {}",
+                    if !key_is_local { "drop" } else { "keep" },
+                );
 
-            tracing::debug!(
-                lsn=%next_record_lsn,
-                key=%key,
-                "ingest: shard decision {}",
-                if !key_is_local { "drop" } else { "keep" },
-            );
+                if !key_is_local {
+                    if shard.is_shard_zero() {
+                        // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
+                        // its blkno in case it implicitly extends a relation.
+                        metadata.push(ValueMeta::Observed(ObservedValueMeta {
+                            key: key.to_compact(),
+                            lsn: next_record_lsn,
+                        }))
+                    }
 
-            if !key_is_local {
-                if shard.is_shard_zero() {
-                    // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
-                    // its blkno in case it implicitly extends a relation.
-                    metadata.push(ValueMeta::Observed(ObservedValueMeta {
-                        key: key.to_compact(),
-                        lsn: next_record_lsn,
-                    }))
+                    continue;
                 }
 
-                continue;
-            }
-
-            // Instead of storing full-page-image WAL record,
-            // it is better to store extracted image: we can skip wal-redo
-            // in this case. Also some FPI records may contain multiple (up to 32) pages,
-            // so them have to be copied multiple times.
-            //
-            let val = if Self::block_is_image(&decoded, blk, pg_version) {
-                // Extract page image from FPI record
-                let img_len = blk.bimg_len as usize;
-                let img_offs = blk.bimg_offset as usize;
-                let mut image = BytesMut::with_capacity(BLCKSZ as usize);
-                // TODO(vlad): skip the copy
-                image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
-
-                if blk.hole_length != 0 {
-                    let tail = image.split_off(blk.hole_offset as usize);
-                    image.resize(image.len() + blk.hole_length as usize, 0u8);
-                    image.unsplit(tail);
-                }
+                // Instead of storing full-page-image WAL record,
+                // it is better to store extracted image: we can skip wal-redo
+                // in this case. Also some FPI records may contain multiple (up to 32) pages,
+                // so them have to be copied multiple times.
                 //
-                // Match the logic of XLogReadBufferForRedoExtended:
-                // The page may be uninitialized. If so, we can't set the LSN because
-                // that would corrupt the page.
-                //
-                if !page_is_new(&image) {
-                    page_set_lsn(&mut image, next_record_lsn)
-                }
-                assert_eq!(image.len(), BLCKSZ as usize);
-
-                Value::Image(image.freeze())
-            } else {
-                Value::WalRecord(NeonWalRecord::Postgres {
-                    will_init: blk.will_init || blk.apply_image,
-                    rec: decoded.record.clone(),
-                })
-            };
-
-            val.ser_into(&mut buf)
-                .expect("Writing into in-memory buffer is infallible");
-
-            let val_ser_size = buf.len() - relative_off as usize;
-
-            metadata.push(ValueMeta::Serialized(SerializedValueMeta {
-                key: key.to_compact(),
-                lsn: next_record_lsn,
-                batch_offset: relative_off,
-                len: val_ser_size,
-                will_init: val.will_init(),
-            }));
-            max_lsn = std::cmp::max(max_lsn, next_record_lsn);
-            len += 1;
+                let val = if Self::block_is_image(&decoded, blk, pg_version) {
+                    // Extract page image from FPI record
+                    let img_len = blk.bimg_len as usize;
+                    let img_offs = blk.bimg_offset as usize;
+                    let mut image = BytesMut::with_capacity(BLCKSZ as usize);
+                    // TODO(vlad): skip the copy
+                    image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
+
+                    if blk.hole_length != 0 {
+                        let tail = image.split_off(blk.hole_offset as usize);
+                        image.resize(image.len() + blk.hole_length as usize, 0u8);
+                        image.unsplit(tail);
+                    }
+                    //
+                    // Match the logic of XLogReadBufferForRedoExtended:
+                    // The page may be uninitialized. If so, we can't set the LSN because
+                    // that would corrupt the page.
+                    //
+                    if !page_is_new(&image) {
+                        page_set_lsn(&mut image, next_record_lsn)
+                    }
+                    assert_eq!(image.len(), BLCKSZ as usize);
+
+                    Value::Image(image.freeze())
+                } else {
+                    Value::WalRecord(NeonWalRecord::Postgres {
+                        will_init: blk.will_init || blk.apply_image,
+                        rec: decoded.record.clone(),
+                    })
+                };
+
+                let relative_off = raw.len() as u64;
+
+                val.ser_into(raw)
+                    .expect("Writing into in-memory buffer is infallible");
+
+                let val_ser_size = raw.len() - relative_off as usize;
+
+                metadata.push(ValueMeta::Serialized(SerializedValueMeta {
+                    key: key.to_compact(),
+                    lsn: next_record_lsn,
+                    batch_offset: relative_off,
+                    len: val_ser_size,
+                    will_init: val.will_init(),
+                }));
+                *max_lsn = std::cmp::max(*max_lsn, next_record_lsn);
+                *len += 1;
+            }
         }
 
         if cfg!(any(debug_assertions, test)) {
-            let batch = Self {
-                raw: buf,
-                metadata,
-                max_lsn,
-                len,
-            };
+            use std::collections::HashSet;
 
-            batch.validate_lsn_order();
+            // Validate that the batches are correct
+            for batch in shard_batches.values() {
+                batch.validate_lsn_order();
+            }
 
-            return Ok(batch);
+            // Validate we produced batches for each requested shard
+            assert_eq!(
+                shard_batches.keys().collect::<HashSet<_>>(),
+                shards.iter().collect::<HashSet<_>>()
+            );
         }
 
-        Ok(Self {
-            raw: buf,
-            metadata,
-            max_lsn,
-            len,
-        })
+        Ok(shard_batches)
     }
 
     /// Look into the decoded PG WAL record and determine
diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs
index c061714010a2..effe12f9448f 100644
--- a/pageserver/src/import_datadir.rs
+++ b/pageserver/src/import_datadir.rs
@@ -278,6 +278,8 @@ async fn import_wal(
 
     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
 
+    let shard = vec![*tline.get_shard_identity()];
+
     while last_lsn <= endpoint {
         // FIXME: assume postgresql tli 1 for now
         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
@@ -312,12 +314,16 @@ async fn import_wal(
         let mut modification = tline.begin_modification(last_lsn);
         while last_lsn <= endpoint {
             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
-                let interpreted = InterpretedWalRecord::from_bytes_filtered(
+                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
-                    tline.get_shard_identity(),
+                    &shard,
                     lsn,
                     tline.pg_version,
-                )?;
+                )?
+                .pop()
+                .unwrap();
+
+                assert_eq!(got_shard, *tline.get_shard_identity());
 
                 walingest
                     .ingest_record(interpreted, &mut modification, ctx)
@@ -411,6 +417,7 @@ pub async fn import_wal_from_tar(
     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     let mut last_lsn = start_lsn;
     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
+    let shard = vec![*tline.get_shard_identity()];
 
     // Ingest wal until end_lsn
     info!("importing wal until {}", end_lsn);
@@ -457,12 +464,16 @@ pub async fn import_wal_from_tar(
         let mut modification = tline.begin_modification(last_lsn);
         while last_lsn <= end_lsn {
             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
-                let interpreted = InterpretedWalRecord::from_bytes_filtered(
+                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
-                    tline.get_shard_identity(),
+                    &shard,
                     lsn,
                     tline.pg_version,
-                )?;
+                )?
+                .pop()
+                .unwrap();
+
+                assert_eq!(got_shard, *tline.get_shard_identity());
 
                 walingest
                     .ingest_record(interpreted, &mut modification, ctx)
diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
index 3f10eeda60a9..d9aac9b630f6 100644
--- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
+++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
@@ -264,6 +264,8 @@ pub(super) async fn handle_walreceiver_connection(
 
     let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
 
+    let shard = vec![*timeline.get_shard_identity()];
+
     let interpreted_proto_config = match protocol {
         PostgresClientProtocol::Vanilla => None,
         PostgresClientProtocol::Interpreted {
@@ -494,12 +496,16 @@ pub(super) async fn handle_walreceiver_connection(
                         }
 
                         // Deserialize and interpret WAL record
-                        let interpreted = InterpretedWalRecord::from_bytes_filtered(
+                        let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
                             recdata,
-                            modification.tline.get_shard_identity(),
+                            &shard,
                             next_record_lsn,
                             modification.tline.pg_version,
-                        )?;
+                        )?
+                        .pop()
+                        .unwrap();
+
+                        assert_eq!(got_shard, *modification.tline.get_shard_identity());
 
                         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
                             && uncommitted_records > 0
diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs
index e5b23fed5155..3a8dcdfacb61 100644
--- a/pageserver/src/walingest.rs
+++ b/pageserver/src/walingest.rs
@@ -2161,14 +2161,18 @@ mod tests {
         for chunk in bytes[xlogoff..].chunks(50) {
             decoder.feed_bytes(chunk);
             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
-                let interpreted = InterpretedWalRecord::from_bytes_filtered(
+                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
-                    modification.tline.get_shard_identity(),
+                    &vec![*modification.tline.get_shard_identity()],
                     lsn,
                     modification.tline.pg_version,
                 )
+                .unwrap()
+                .pop()
                 .unwrap();
 
+                assert_eq!(got_shard, *modification.tline.get_shard_identity());
+
                 walingest
                     .ingest_record(interpreted, &mut modification, &ctx)
                     .instrument(span.clone())
diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs
index 25890304221e..2f7107ac9757 100644
--- a/safekeeper/src/send_interpreted_wal.rs
+++ b/safekeeper/src/send_interpreted_wal.rs
@@ -57,6 +57,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
         keepalive_ticker.reset();
 
         let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
+        let shard = vec![self.shard];
 
         loop {
             tokio::select! {
@@ -80,14 +81,19 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
                         assert!(next_record_lsn.is_aligned());
                         max_next_record_lsn = Some(next_record_lsn);
 
+
                         // Deserialize and interpret WAL record
-                        let interpreted = InterpretedWalRecord::from_bytes_filtered(
+                        let (shard_id, interpreted) = InterpretedWalRecord::from_bytes_filtered(
                             recdata,
-                            &self.shard,
+                            &shard,
                             next_record_lsn,
                             self.pg_version,
                         )
-                        .with_context(|| "Failed to interpret WAL")?;
+                        .with_context(|| "Failed to interpret WAL")?
+                        .pop()
+                        .unwrap();
+
+                        assert_eq!(shard_id, self.shard);
 
                         if !interpreted.is_empty() {
                             records.push(interpreted);

From f1fb1058fb2b4d4304c893981bad04332c23dfbd Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 11:02:08 +0100
Subject: [PATCH 3/9] wal_decoder: use &[] instead of &Vec<>

---
 libs/wal_decoder/src/decoder.rs          | 4 ++--
 libs/wal_decoder/src/serialized_batch.rs | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs
index 2993f9cce5ee..3e545e0048bf 100644
--- a/libs/wal_decoder/src/decoder.rs
+++ b/libs/wal_decoder/src/decoder.rs
@@ -19,7 +19,7 @@ impl InterpretedWalRecord {
     /// the keys that are being written as that is enough for updating relation sizes.
     pub fn from_bytes_filtered(
         buf: Bytes,
-        shards: &Vec<ShardIdentity>,
+        shards: &[ShardIdentity],
         next_record_lsn: Lsn,
         pg_version: u32,
     ) -> anyhow::Result<Vec<(ShardIdentity, InterpretedWalRecord)>> {
@@ -66,7 +66,7 @@ impl MetadataRecord {
     /// records are broadcast to all shards for simplicity, but this should be improved.
     fn from_decoded_filtered(
         decoded: &DecodedWALRecord,
-        shards: &Vec<ShardIdentity>,
+        shards: &[ShardIdentity],
         next_record_lsn: Lsn,
         pg_version: u32,
     ) -> anyhow::Result<Vec<(ShardIdentity, Option<MetadataRecord>)>> {
diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs
index df2e79e0429a..0962ea4fa326 100644
--- a/libs/wal_decoder/src/serialized_batch.rs
+++ b/libs/wal_decoder/src/serialized_batch.rs
@@ -136,7 +136,7 @@ impl SerializedValueBatch {
     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     pub(crate) fn from_decoded_filtered(
         decoded: DecodedWALRecord,
-        shards: &Vec<ShardIdentity>,
+        shards: &[ShardIdentity],
         next_record_lsn: Lsn,
         pg_version: u32,
     ) -> anyhow::Result<HashMap<ShardIdentity, SerializedValueBatch>> {

From 9c14f908cbfb058180465bce1cc741a026c9de71 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 11:35:17 +0100
Subject: [PATCH 4/9] wal_decoder: allocate hash map upfront

---
 libs/wal_decoder/src/decoder.rs               | 112 +++++++++++-------
 libs/wal_decoder/src/serialized_batch.rs      |  41 ++-----
 pageserver/src/import_datadir.rs              |  12 +-
 .../walreceiver/walreceiver_connection.rs     |   6 +-
 pageserver/src/walingest.rs                   |   6 +-
 safekeeper/src/send_interpreted_wal.rs        |   6 +-
 6 files changed, 91 insertions(+), 92 deletions(-)

diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs
index 3e545e0048bf..a79844b0fb7e 100644
--- a/libs/wal_decoder/src/decoder.rs
+++ b/libs/wal_decoder/src/decoder.rs
@@ -1,6 +1,8 @@
 //! This module contains logic for decoding and interpreting
 //! raw bytes which represent a raw Postgres WAL record.
 
+use std::collections::HashMap;
+
 use crate::models::*;
 use crate::serialized_batch::SerializedValueBatch;
 use bytes::{Buf, Bytes};
@@ -22,7 +24,7 @@ impl InterpretedWalRecord {
         shards: &[ShardIdentity],
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<Vec<(ShardIdentity, InterpretedWalRecord)>> {
+    ) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
         let mut decoded = DecodedWALRecord::default();
         decode_wal_record(buf, &mut decoded, pg_version)?;
         let xid = decoded.xl_xid;
@@ -33,29 +35,39 @@ impl InterpretedWalRecord {
             FlushUncommittedRecords::No
         };
 
-        let metadata_per_shard =
-            MetadataRecord::from_decoded_filtered(&decoded, shards, next_record_lsn, pg_version)?;
-        let mut batches_per_shard = SerializedValueBatch::from_decoded_filtered(
+        let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
+            HashMap::from_iter(shards.iter().map(|shard| {
+                (
+                    *shard,
+                    InterpretedWalRecord {
+                        metadata_record: None,
+                        batch: SerializedValueBatch {
+                            raw: Vec::new(),
+                            metadata: Default::default(),
+                            max_lsn: Lsn(0),
+                            len: 0,
+                        },
+                        next_record_lsn,
+                        flush_uncommitted,
+                        xid,
+                    },
+                )
+            }));
+
+        MetadataRecord::from_decoded_filtered(
+            &decoded,
+            &mut shard_records,
+            next_record_lsn,
+            pg_version,
+        )?;
+        SerializedValueBatch::from_decoded_filtered(
             decoded,
-            shards,
+            &mut shard_records,
             next_record_lsn,
             pg_version,
         )?;
 
-        let mut record_per_shard = Vec::with_capacity(shards.len());
-        for (shard_id, metadata_record) in metadata_per_shard {
-            let record = InterpretedWalRecord {
-                metadata_record,
-                batch: batches_per_shard.remove(&shard_id).unwrap(),
-                next_record_lsn,
-                flush_uncommitted,
-                xid,
-            };
-
-            record_per_shard.push((shard_id, record));
-        }
-
-        Ok(record_per_shard)
+        Ok(shard_records)
     }
 }
 
@@ -66,17 +78,17 @@ impl MetadataRecord {
     /// records are broadcast to all shards for simplicity, but this should be improved.
     fn from_decoded_filtered(
         decoded: &DecodedWALRecord,
-        shards: &[ShardIdentity],
+        shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<Vec<(ShardIdentity, Option<MetadataRecord>)>> {
+    ) -> anyhow::Result<()> {
         // Note: this doesn't actually copy the bytes since
         // the [`Bytes`] type implements it via a level of indirection.
         let mut buf = decoded.record.clone();
         buf.advance(decoded.main_data_offset);
 
         // First, generate metadata records from the decoded WAL record.
-        let mut metadata_record = match decoded.xl_rmid {
+        let metadata_record = match decoded.xl_rmid {
             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
                 Self::decode_heapam_record(&mut buf, decoded, pg_version)?
             }
@@ -119,14 +131,11 @@ impl MetadataRecord {
         };
 
         // Next, filter the metadata record by shard.
-        let mut metadata_records_per_shard = Vec::with_capacity(shards.len());
-        for shard in shards {
-            let mut metadata_for_shard = metadata_record.clone();
-
-            match metadata_for_shard {
+        for (shard, record) in shard_records.iter_mut() {
+            let metadata_record_for_shard = match metadata_record {
                 Some(
-                    MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
-                    | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
+                    MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
+                    | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
                 ) => {
                     // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
                     // of the main relation. These are sharded and managed just like regular relation pages.
@@ -136,34 +145,51 @@ impl MetadataRecord {
                         shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
                     };
                     // Send the old and new VM page updates to their respective shards.
-                    clear_vm_bits.old_heap_blkno = clear_vm_bits
+                    let updated_old_heap_blkno = clear_vm_bits
                         .old_heap_blkno
                         .filter(|&blkno| is_local_vm_page(blkno));
-                    clear_vm_bits.new_heap_blkno = clear_vm_bits
+                    let updated_new_heap_blkno = clear_vm_bits
                         .new_heap_blkno
                         .filter(|&blkno| is_local_vm_page(blkno));
                     // If neither VM page belongs to this shard, discard the record.
-                    if clear_vm_bits.old_heap_blkno.is_none()
-                        && clear_vm_bits.new_heap_blkno.is_none()
-                    {
-                        metadata_record = None
+                    if updated_old_heap_blkno.is_none() && updated_new_heap_blkno.is_none() {
+                        None
+                    } else {
+                        let mut for_shard = metadata_record.clone();
+                        match for_shard {
+                            Some(
+                                MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
+                                    ref mut clear_vm_bits,
+                                ))
+                                | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
+                                    ref mut clear_vm_bits,
+                                )),
+                            ) => {
+                                clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
+                                clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
+                                for_shard
+                            }
+                            _ => {
+                                unreachable!("for_shard is a clone of what we checked above")
+                            }
+                        }
                     }
                 }
                 Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
                     // Filter LogicalMessage records (AUX files) to only be stored on shard zero
-                    if !shard.is_shard_zero() {
-                        metadata_record = None;
+                    if shard.is_shard_zero() {
+                        metadata_record.clone()
+                    } else {
+                        None
                     }
                 }
-                _ => {}
-            }
+                _ => metadata_record.clone(),
+            };
 
-            metadata_records_per_shard.push((*shard, metadata_for_shard));
+            record.metadata_record = metadata_record_for_shard;
         }
 
-        assert_eq!(metadata_records_per_shard.len(), shards.len());
-
-        Ok(metadata_records_per_shard)
+        Ok(())
     }
 
     fn decode_heapam_record(
diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs
index 0962ea4fa326..dfa700ce983b 100644
--- a/libs/wal_decoder/src/serialized_batch.rs
+++ b/libs/wal_decoder/src/serialized_batch.rs
@@ -22,6 +22,8 @@ use utils::lsn::Lsn;
 
 use pageserver_api::key::Key;
 
+use crate::models::InterpretedWalRecord;
+
 static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
 
 /// Accompanying metadata for the batch
@@ -136,25 +138,15 @@ impl SerializedValueBatch {
     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     pub(crate) fn from_decoded_filtered(
         decoded: DecodedWALRecord,
-        shards: &[ShardIdentity],
+        shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
         next_record_lsn: Lsn,
         pg_version: u32,
-    ) -> anyhow::Result<HashMap<ShardIdentity, SerializedValueBatch>> {
+    ) -> anyhow::Result<()> {
         // First determine how big the buffers need to be and allocate it up-front.
         // This duplicates some of the work below, but it's empirically much faster.
-        let mut shard_batches = HashMap::with_capacity(shards.len());
-        for shard in shards {
-            let buf =
-                Vec::<u8>::with_capacity(Self::estimate_buffer_size(&decoded, shard, pg_version));
-            shard_batches.insert(
-                *shard,
-                SerializedValueBatch {
-                    raw: buf,
-                    metadata: Default::default(),
-                    max_lsn: Lsn(0),
-                    len: 0,
-                },
-            );
+        for (shard, record) in shard_records.iter_mut() {
+            let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
+            record.batch.raw = Vec::with_capacity(estimate);
         }
 
         for blk in decoded.blocks.iter() {
@@ -175,14 +167,13 @@ impl SerializedValueBatch {
                 );
             }
 
-            for shard in shards {
-                let mut batch = shard_batches.get_mut(shard).expect("inserted in prologue");
+            for (shard, record) in shard_records.iter_mut() {
                 let SerializedValueBatch {
                     raw,
                     metadata,
                     max_lsn,
                     len,
-                } = &mut batch;
+                } = &mut record.batch;
 
                 let key_is_local = shard.is_key_local(&key);
 
@@ -262,21 +253,13 @@ impl SerializedValueBatch {
         }
 
         if cfg!(any(debug_assertions, test)) {
-            use std::collections::HashSet;
-
             // Validate that the batches are correct
-            for batch in shard_batches.values() {
-                batch.validate_lsn_order();
+            for record in shard_records.values() {
+                record.batch.validate_lsn_order();
             }
-
-            // Validate we produced batches for each requested shard
-            assert_eq!(
-                shard_batches.keys().collect::<HashSet<_>>(),
-                shards.iter().collect::<HashSet<_>>()
-            );
         }
 
-        Ok(shard_batches)
+        Ok(())
     }
 
     /// Look into the decoded PG WAL record and determine
diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs
index effe12f9448f..a73fa5cec873 100644
--- a/pageserver/src/import_datadir.rs
+++ b/pageserver/src/import_datadir.rs
@@ -314,17 +314,15 @@ async fn import_wal(
         let mut modification = tline.begin_modification(last_lsn);
         while last_lsn <= endpoint {
             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
-                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
+                let interpreted = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
                     &shard,
                     lsn,
                     tline.pg_version,
                 )?
-                .pop()
+                .remove(tline.get_shard_identity())
                 .unwrap();
 
-                assert_eq!(got_shard, *tline.get_shard_identity());
-
                 walingest
                     .ingest_record(interpreted, &mut modification, ctx)
                     .await?;
@@ -464,17 +462,15 @@ pub async fn import_wal_from_tar(
         let mut modification = tline.begin_modification(last_lsn);
         while last_lsn <= end_lsn {
             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
-                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
+                let interpreted = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
                     &shard,
                     lsn,
                     tline.pg_version,
                 )?
-                .pop()
+                .remove(tline.get_shard_identity())
                 .unwrap();
 
-                assert_eq!(got_shard, *tline.get_shard_identity());
-
                 walingest
                     .ingest_record(interpreted, &mut modification, ctx)
                     .await?;
diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
index d9aac9b630f6..1b83b5706a8d 100644
--- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
+++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs
@@ -496,17 +496,15 @@ pub(super) async fn handle_walreceiver_connection(
                         }
 
                         // Deserialize and interpret WAL record
-                        let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
+                        let interpreted = InterpretedWalRecord::from_bytes_filtered(
                             recdata,
                             &shard,
                             next_record_lsn,
                             modification.tline.pg_version,
                         )?
-                        .pop()
+                        .remove(timeline.get_shard_identity())
                         .unwrap();
 
-                        assert_eq!(got_shard, *modification.tline.get_shard_identity());
-
                         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
                             && uncommitted_records > 0
                         {
diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs
index 3a8dcdfacb61..e794cab765d3 100644
--- a/pageserver/src/walingest.rs
+++ b/pageserver/src/walingest.rs
@@ -2161,18 +2161,16 @@ mod tests {
         for chunk in bytes[xlogoff..].chunks(50) {
             decoder.feed_bytes(chunk);
             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
-                let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered(
+                let interpreted = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
                     &vec![*modification.tline.get_shard_identity()],
                     lsn,
                     modification.tline.pg_version,
                 )
                 .unwrap()
-                .pop()
+                .remove(modification.tline.get_shard_identity())
                 .unwrap();
 
-                assert_eq!(got_shard, *modification.tline.get_shard_identity());
-
                 walingest
                     .ingest_record(interpreted, &mut modification, &ctx)
                     .instrument(span.clone())
diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs
index 2f7107ac9757..225253e5040b 100644
--- a/safekeeper/src/send_interpreted_wal.rs
+++ b/safekeeper/src/send_interpreted_wal.rs
@@ -83,18 +83,16 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
 
 
                         // Deserialize and interpret WAL record
-                        let (shard_id, interpreted) = InterpretedWalRecord::from_bytes_filtered(
+                        let interpreted = InterpretedWalRecord::from_bytes_filtered(
                             recdata,
                             &shard,
                             next_record_lsn,
                             self.pg_version,
                         )
                         .with_context(|| "Failed to interpret WAL")?
-                        .pop()
+                        .remove(&self.shard)
                         .unwrap();
 
-                        assert_eq!(shard_id, self.shard);
-
                         if !interpreted.is_empty() {
                             records.push(interpreted);
                         }

From fd82ee7efac373d8aae93067dc21f7147d8aa359 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 12:00:16 +0100
Subject: [PATCH 5/9] wal_decoder: reserve hash map capacity upfront

---
 libs/wal_decoder/src/decoder.rs | 33 +++++++++++++++++----------------
 1 file changed, 17 insertions(+), 16 deletions(-)

diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs
index a79844b0fb7e..35792b0759f8 100644
--- a/libs/wal_decoder/src/decoder.rs
+++ b/libs/wal_decoder/src/decoder.rs
@@ -36,23 +36,24 @@ impl InterpretedWalRecord {
         };
 
         let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
-            HashMap::from_iter(shards.iter().map(|shard| {
-                (
-                    *shard,
-                    InterpretedWalRecord {
-                        metadata_record: None,
-                        batch: SerializedValueBatch {
-                            raw: Vec::new(),
-                            metadata: Default::default(),
-                            max_lsn: Lsn(0),
-                            len: 0,
-                        },
-                        next_record_lsn,
-                        flush_uncommitted,
-                        xid,
+            HashMap::with_capacity(shards.len());
+        for shard in shards {
+            shard_records.insert(
+                *shard,
+                InterpretedWalRecord {
+                    metadata_record: None,
+                    batch: SerializedValueBatch {
+                        raw: Vec::new(),
+                        metadata: Default::default(),
+                        max_lsn: Lsn(0),
+                        len: 0,
                     },
-                )
-            }));
+                    next_record_lsn,
+                    flush_uncommitted,
+                    xid,
+                },
+            );
+        }
 
         MetadataRecord::from_decoded_filtered(
             &decoded,

From 8db75e232ad828865ab5a882108e570a4b646780 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 12:01:15 +0100
Subject: [PATCH 6/9] pagserver_api: skip stripe size in shard id hashing

It cannot be changed dynamically. For the stripe size to change,
the shard count needs to change too.
---
 libs/pageserver_api/src/shard.rs | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs
index 21c7e2b36f49..43fe333df15a 100644
--- a/libs/pageserver_api/src/shard.rs
+++ b/libs/pageserver_api/src/shard.rs
@@ -31,6 +31,8 @@
 //! - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
 //!   and their slugs are 0004, 0104, 0204, and 0304.
 
+use std::hash::{Hash, Hasher};
+
 use crate::{key::Key, models::ShardParameters};
 use postgres_ffi::relfile_utils::INIT_FORKNUM;
 use serde::{Deserialize, Serialize};
@@ -40,7 +42,7 @@ pub use ::utils::shard::*;
 
 /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
 /// and to check whether that [`ShardNumber`] is the same as the current shard.
-#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
+#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
 pub struct ShardIdentity {
     pub number: ShardNumber,
     pub count: ShardCount,
@@ -48,8 +50,18 @@ pub struct ShardIdentity {
     layout: ShardLayout,
 }
 
+/// Hash implementation
+///
+/// The stripe size cannot change dinamically, so it can be ignored for efficiency reasons.
+impl Hash for ShardIdentity {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.number.0.hash(state);
+        self.count.0.hash(state);
+    }
+}
+
 /// Stripe size in number of pages
-#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
+#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
 pub struct ShardStripeSize(pub u32);
 
 impl Default for ShardStripeSize {

From 86350865bc1d1b4b137179672ca9ed4c4968027e Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 13:42:30 +0100
Subject: [PATCH 7/9] wal_decoder: add benchmarks against real WAL

---
 Cargo.lock                                    |  10 +
 libs/wal_decoder/Cargo.toml                   |  16 ++
 libs/wal_decoder/benches/README.md            |  32 +++
 .../benches/bench_interpret_wal.rs            | 250 ++++++++++++++++++
 4 files changed, 308 insertions(+)
 create mode 100644 libs/wal_decoder/benches/README.md
 create mode 100644 libs/wal_decoder/benches/bench_interpret_wal.rs

diff --git a/Cargo.lock b/Cargo.lock
index d9ac167042ad..e7892e25e39a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7370,12 +7370,22 @@ dependencies = [
  "anyhow",
  "async-compression",
  "bytes",
+ "camino",
+ "camino-tempfile",
+ "criterion",
+ "futures",
  "pageserver_api",
  "postgres_ffi",
+ "pprof",
  "prost",
+ "remote_storage",
  "serde",
+ "serde_json",
+ "smallvec",
  "thiserror",
+ "tikv-jemallocator",
  "tokio",
+ "tokio-util",
  "tonic",
  "tonic-build",
  "tracing",
diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml
index 8fac4e38cae6..c6829cac570b 100644
--- a/libs/wal_decoder/Cargo.toml
+++ b/libs/wal_decoder/Cargo.toml
@@ -20,7 +20,23 @@ tokio = { workspace = true, features = ["io-util"] }
 tonic.workspace = true
 tracing.workspace = true
 utils.workspace = true
+smallvec.workspace = true
 workspace_hack = { version = "0.1", path = "../../workspace_hack" }
 
 [build-dependencies]
 tonic-build.workspace = true
+
+[dev-dependencies]
+criterion.workspace = true
+camino.workspace = true
+camino-tempfile.workspace = true
+remote_storage.workspace = true
+tokio-util.workspace = true
+serde_json.workspace = true
+futures.workspace = true
+tikv-jemallocator.workspace = true
+pprof.workspace = true
+
+[[bench]]
+name = "bench_interpret_wal"
+harness = false
diff --git a/libs/wal_decoder/benches/README.md b/libs/wal_decoder/benches/README.md
new file mode 100644
index 000000000000..2d2aac44debb
--- /dev/null
+++ b/libs/wal_decoder/benches/README.md
@@ -0,0 +1,32 @@
+## WAL Decoding and Interpretation Benchmarks
+
+Note that these benchmarks pull WAL from a public bucket in S3
+as a preparation step. Hence, you need a way to auth with AWS.
+You can achieve this by copying the `~/.aws/config` file from
+the AWS SSO notion page and exportin `AWS_PROFILE=dev` when invoking
+the benchmarks.
+
+To run benchmarks:
+
+```sh
+# All benchmarks.
+AWS_PROFILE=dev cargo bench --package wal_decoder
+
+# Specific file.
+AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal
+
+# Specific benchmark.
+AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal unsharded
+
+# List available benchmarks.
+cargo bench --package wal_decoder --benches -- --list
+
+# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
+# Output in target/criterion/*/profile/flamegraph.svg.
+AWS_PROFILE=dev cargo bench --package wal_decoder --bench bench_interpret_wal unsharded -- --profile-time 10
+```
+
+Additional charts and statistics are available in `target/criterion/report/index.html`.
+
+Benchmarks are automatically compared against the previous run. To compare against other runs, see
+`--baseline` and `--save-baseline`.
diff --git a/libs/wal_decoder/benches/bench_interpret_wal.rs b/libs/wal_decoder/benches/bench_interpret_wal.rs
new file mode 100644
index 000000000000..846904cf873e
--- /dev/null
+++ b/libs/wal_decoder/benches/bench_interpret_wal.rs
@@ -0,0 +1,250 @@
+use anyhow::Context;
+use criterion::{criterion_group, criterion_main, Criterion};
+use futures::{stream::FuturesUnordered, StreamExt};
+use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
+use postgres_ffi::{waldecoder::WalStreamDecoder, MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
+use pprof::criterion::{Output, PProfProfiler};
+use serde::Deserialize;
+use std::{env, num::NonZeroUsize, sync::Arc};
+
+use camino::{Utf8Path, Utf8PathBuf};
+use camino_tempfile::Utf8TempDir;
+use remote_storage::{
+    DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind,
+    S3Config,
+};
+use tokio_util::sync::CancellationToken;
+use utils::{
+    lsn::Lsn,
+    shard::{ShardCount, ShardNumber},
+};
+use wal_decoder::models::InterpretedWalRecord;
+
+const S3_BUCKET: &str = "neon-github-public-dev";
+const S3_REGION: &str = "eu-central-1";
+const BUCKET_PREFIX: &str = "wal-snapshots/bulk-insert/";
+const METADATA_FILENAME: &str = "metadata.json";
+
+/// Use jemalloc, and configure it to sample allocations for profiles every 1 MB.
+/// This mirrors the configuration in bin/safekeeper.rs.
+#[global_allocator]
+static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+
+#[allow(non_upper_case_globals)]
+#[export_name = "malloc_conf"]
+pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
+
+async fn create_s3_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {
+    let remote_storage_config = RemoteStorageConfig {
+        storage: RemoteStorageKind::AwsS3(S3Config {
+            bucket_name: S3_BUCKET.to_string(),
+            bucket_region: S3_REGION.to_string(),
+            prefix_in_bucket: Some(BUCKET_PREFIX.to_string()),
+            endpoint: None,
+            concurrency_limit: NonZeroUsize::new(100).unwrap(),
+            max_keys_per_list_response: None,
+            upload_storage_class: None,
+        }),
+        timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
+        small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
+    };
+    Ok(Arc::new(
+        GenericRemoteStorage::from_config(&remote_storage_config)
+            .await
+            .context("remote storage init")?,
+    ))
+}
+
+async fn download_bench_data(
+    client: Arc<GenericRemoteStorage>,
+    cancel: &CancellationToken,
+) -> anyhow::Result<Utf8TempDir> {
+    let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into()?;
+    let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent)?;
+
+    eprintln!("Downloading benchmark data to {:?}", temp_dir);
+
+    let listing = client
+        .list(None, ListingMode::NoDelimiter, None, cancel)
+        .await?;
+
+    let mut downloads = listing
+        .keys
+        .into_iter()
+        .map(|obj| {
+            let client = client.clone();
+            let temp_dir_path = temp_dir.path().to_owned();
+
+            async move {
+                let remote_path = obj.key;
+                let download = client
+                    .download(&remote_path, &DownloadOpts::default(), cancel)
+                    .await?;
+                let mut body = tokio_util::io::StreamReader::new(download.download_stream);
+
+                let file_name = remote_path.object_name().unwrap();
+                let file_path = temp_dir_path.join(file_name);
+                let file = tokio::fs::OpenOptions::new()
+                    .create(true)
+                    .truncate(true)
+                    .write(true)
+                    .open(&file_path)
+                    .await?;
+
+                let mut writer = tokio::io::BufWriter::new(file);
+                tokio::io::copy_buf(&mut body, &mut writer).await?;
+
+                Ok::<(), anyhow::Error>(())
+            }
+        })
+        .collect::<FuturesUnordered<_>>();
+
+    while let Some(download) = downloads.next().await {
+        download?;
+    }
+
+    Ok(temp_dir)
+}
+
+struct BenchmarkData {
+    wal: Vec<u8>,
+    meta: BenchmarkMetadata,
+}
+
+#[derive(Deserialize)]
+struct BenchmarkMetadata {
+    pg_version: u32,
+    start_lsn: Lsn,
+}
+
+async fn load_bench_data(path: &Utf8Path, input_size: usize) -> anyhow::Result<BenchmarkData> {
+    eprintln!("Loading benchmark data from {:?}", path);
+
+    let mut entries = tokio::fs::read_dir(path).await?;
+    let mut ordered_segment_paths = Vec::new();
+    let mut metadata = None;
+
+    while let Some(entry) = entries.next_entry().await? {
+        if entry.file_name() == METADATA_FILENAME {
+            let bytes = tokio::fs::read(entry.path()).await?;
+            metadata = Some(
+                serde_json::from_slice::<BenchmarkMetadata>(&bytes)
+                    .context("failed to deserialize metadata.json")?,
+            );
+        } else {
+            ordered_segment_paths.push(entry.path());
+        }
+    }
+
+    ordered_segment_paths.sort();
+
+    let mut buffer = Vec::new();
+    for path in ordered_segment_paths {
+        if buffer.len() >= input_size {
+            break;
+        }
+
+        use async_compression::tokio::bufread::ZstdDecoder;
+        let file = tokio::fs::File::open(path).await?;
+        let reader = tokio::io::BufReader::new(file);
+        let decoder = ZstdDecoder::new(reader);
+        let mut reader = tokio::io::BufReader::new(decoder);
+        tokio::io::copy_buf(&mut reader, &mut buffer).await?;
+    }
+
+    buffer.truncate(input_size);
+
+    Ok(BenchmarkData {
+        wal: buffer,
+        meta: metadata.unwrap(),
+    })
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    const INPUT_SIZE: usize = 128 * 1024 * 1024;
+
+    let setup_runtime = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+
+    let (_temp_dir, bench_data) = setup_runtime.block_on(async move {
+        let cancel = CancellationToken::new();
+        let client = create_s3_client().await.unwrap();
+        let temp_dir = download_bench_data(client, &cancel).await.unwrap();
+        let bench_data = load_bench_data(temp_dir.path(), INPUT_SIZE).await.unwrap();
+
+        (temp_dir, bench_data)
+    });
+
+    eprintln!(
+        "Benchmarking against {} MiB of WAL",
+        INPUT_SIZE / 1024 / 1024
+    );
+
+    let mut group = c.benchmark_group("decode-interpret-wal");
+    group.throughput(criterion::Throughput::Bytes(bench_data.wal.len() as u64));
+    group.sample_size(10);
+
+    group.bench_function("unsharded", |b| {
+        b.iter(|| decode_interpret_main(&bench_data, &[ShardIdentity::unsharded()]))
+    });
+
+    let eight_shards = (0..8)
+        .map(|i| ShardIdentity::new(ShardNumber(i), ShardCount(8), ShardStripeSize(8)).unwrap())
+        .collect::<Vec<_>>();
+
+    group.bench_function("8/8-shards", |b| {
+        b.iter(|| decode_interpret_main(&bench_data, &eight_shards))
+    });
+
+    let four_shards = eight_shards
+        .into_iter()
+        .filter(|s| s.number.0 % 2 == 0)
+        .collect::<Vec<_>>();
+    group.bench_function("4/8-shards", |b| {
+        b.iter(|| decode_interpret_main(&bench_data, &four_shards))
+    });
+
+    let two_shards = four_shards
+        .into_iter()
+        .filter(|s| s.number.0 % 4 == 0)
+        .collect::<Vec<_>>();
+    group.bench_function("2/8-shards", |b| {
+        b.iter(|| decode_interpret_main(&bench_data, &two_shards))
+    });
+}
+
+fn decode_interpret_main(bench: &BenchmarkData, shards: &[ShardIdentity]) {
+    let r = decode_interpret(bench, shards);
+    if let Err(e) = r {
+        panic!("{e:?}");
+    }
+}
+
+fn decode_interpret(bench: &BenchmarkData, shard: &[ShardIdentity]) -> anyhow::Result<()> {
+    let mut decoder = WalStreamDecoder::new(bench.meta.start_lsn, bench.meta.pg_version);
+    let xlogoff: usize = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
+
+    for chunk in bench.wal[xlogoff..].chunks(MAX_SEND_SIZE) {
+        decoder.feed_bytes(chunk);
+        while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
+            assert!(lsn.is_aligned());
+            let _ = InterpretedWalRecord::from_bytes_filtered(
+                recdata,
+                shard,
+                lsn,
+                bench.meta.pg_version,
+            )
+            .unwrap();
+        }
+    }
+
+    Ok(())
+}
+criterion_group!(
+    name=benches;
+    config=Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
+    targets=criterion_benchmark
+);
+criterion_main!(benches);

From 5fbec70fcc108cb5a09f083644bb3a9d8cae2090 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 13:46:16 +0100
Subject: [PATCH 8/9] Benchmark results

decode-interpret-wal/unsharded
                        time:   [439.01 ms 439.23 ms 439.48 ms]
                        thrpt:  [291.25 MiB/s 291.42 MiB/s 291.56 MiB/s]
decode-interpret-wal/8/8-shards
                        time:   [934.73 ms 935.45 ms 936.41 ms]
                        thrpt:  [136.69 MiB/s 136.83 MiB/s 136.94 MiB/s]
decode-interpret-wal/4/8-shards
                        time:   [646.55 ms 646.72 ms 646.89 ms]
                        thrpt:  [197.87 MiB/s 197.92 MiB/s 197.97 MiB/s]
decode-interpret-wal/2/8-shards
                        time:   [487.38 ms 487.56 ms 487.76 ms]
                        thrpt:  [262.43 MiB/s 262.53 MiB/s 262.63 MiB/s]

From fe243877a08393f68ed63629230e01760c113d59 Mon Sep 17 00:00:00 2001
From: Vlad Lazar <vlad@neon.tech>
Date: Fri, 20 Dec 2024 16:12:40 +0100
Subject: [PATCH 9/9] ci: make clippy happy

---
 pageserver/src/walingest.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs
index e794cab765d3..720cdda28e3b 100644
--- a/pageserver/src/walingest.rs
+++ b/pageserver/src/walingest.rs
@@ -2163,7 +2163,7 @@ mod tests {
             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
                     recdata,
-                    &vec![*modification.tline.get_shard_identity()],
+                    &[*modification.tline.get_shard_identity()],
                     lsn,
                     modification.tline.pg_version,
                 )