diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 4cc0a739e871..21c7e2b36f49 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -40,7 +40,7 @@ pub use ::utils::shard::*; /// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`], /// and to check whether that [`ShardNumber`] is the same as the current shard. -#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)] pub struct ShardIdentity { pub number: ShardNumber, pub count: ShardCount, @@ -49,7 +49,7 @@ pub struct ShardIdentity { } /// Stripe size in number of pages -#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)] pub struct ShardStripeSize(pub u32); impl Default for ShardStripeSize { @@ -59,7 +59,7 @@ impl Default for ShardStripeSize { } /// Layout version: for future upgrades where we might change how the key->shard mapping works -#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)] pub struct ShardLayout(u8); const LAYOUT_V1: ShardLayout = ShardLayout(1); diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index aa50c629113b..2993f9cce5ee 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -14,15 +14,15 @@ use utils::lsn::Lsn; impl InterpretedWalRecord { /// Decode and interpreted raw bytes which represent one Postgres WAL record. - /// Data blocks which do not match the provided shard identity are filtered out. + /// Data blocks which do not match any of the provided shard identities are filtered out. /// Shard 0 is a special case since it tracks all relation sizes. We only give it /// the keys that are being written as that is enough for updating relation sizes. pub fn from_bytes_filtered( buf: Bytes, - shard: &ShardIdentity, + shards: &Vec, next_record_lsn: Lsn, pg_version: u32, - ) -> anyhow::Result { + ) -> anyhow::Result> { let mut decoded = DecodedWALRecord::default(); decode_wal_record(buf, &mut decoded, pg_version)?; let xid = decoded.xl_xid; @@ -33,36 +33,43 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = - MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?; - let batch = SerializedValueBatch::from_decoded_filtered( + let metadata_per_shard = + MetadataRecord::from_decoded_filtered(&decoded, shards, next_record_lsn, pg_version)?; + let mut batches_per_shard = SerializedValueBatch::from_decoded_filtered( decoded, - shard, + shards, next_record_lsn, pg_version, )?; - Ok(InterpretedWalRecord { - metadata_record, - batch, - next_record_lsn, - flush_uncommitted, - xid, - }) + let mut record_per_shard = Vec::with_capacity(shards.len()); + for (shard_id, metadata_record) in metadata_per_shard { + let record = InterpretedWalRecord { + metadata_record, + batch: batches_per_shard.remove(&shard_id).unwrap(), + next_record_lsn, + flush_uncommitted, + xid, + }; + + record_per_shard.push((shard_id, record)); + } + + Ok(record_per_shard) } } impl MetadataRecord { - /// Builds a metadata record for this WAL record, if any. + /// Builds metadata records for this WAL record for the specified shards if any, if any. /// - /// Only metadata records relevant for the given shard are emitted. Currently, most metadata + /// Only metadata records relevant for the given shards is emitted. Currently, most metadata /// records are broadcast to all shards for simplicity, but this should be improved. fn from_decoded_filtered( decoded: &DecodedWALRecord, - shard: &ShardIdentity, + shards: &Vec, next_record_lsn: Lsn, pg_version: u32, - ) -> anyhow::Result> { + ) -> anyhow::Result)>> { // Note: this doesn't actually copy the bytes since // the [`Bytes`] type implements it via a level of indirection. let mut buf = decoded.record.clone(); @@ -112,41 +119,51 @@ impl MetadataRecord { }; // Next, filter the metadata record by shard. - match metadata_record { - Some( - MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits)) - | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)), - ) => { - // Route VM page updates to the shards that own them. VM pages are stored in the VM fork - // of the main relation. These are sharded and managed just like regular relation pages. - // See: https://github.com/neondatabase/neon/issues/9855 - let is_local_vm_page = |heap_blk| { - let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk); - shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk)) - }; - // Send the old and new VM page updates to their respective shards. - clear_vm_bits.old_heap_blkno = clear_vm_bits - .old_heap_blkno - .filter(|&blkno| is_local_vm_page(blkno)); - clear_vm_bits.new_heap_blkno = clear_vm_bits - .new_heap_blkno - .filter(|&blkno| is_local_vm_page(blkno)); - // If neither VM page belongs to this shard, discard the record. - if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() - { - metadata_record = None + let mut metadata_records_per_shard = Vec::with_capacity(shards.len()); + for shard in shards { + let mut metadata_for_shard = metadata_record.clone(); + + match metadata_for_shard { + Some( + MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits)) + | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)), + ) => { + // Route VM page updates to the shards that own them. VM pages are stored in the VM fork + // of the main relation. These are sharded and managed just like regular relation pages. + // See: https://github.com/neondatabase/neon/issues/9855 + let is_local_vm_page = |heap_blk| { + let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk); + shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk)) + }; + // Send the old and new VM page updates to their respective shards. + clear_vm_bits.old_heap_blkno = clear_vm_bits + .old_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + clear_vm_bits.new_heap_blkno = clear_vm_bits + .new_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + // If neither VM page belongs to this shard, discard the record. + if clear_vm_bits.old_heap_blkno.is_none() + && clear_vm_bits.new_heap_blkno.is_none() + { + metadata_record = None + } } - } - Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => { - // Filter LogicalMessage records (AUX files) to only be stored on shard zero - if !shard.is_shard_zero() { - metadata_record = None; + Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => { + // Filter LogicalMessage records (AUX files) to only be stored on shard zero + if !shard.is_shard_zero() { + metadata_record = None; + } } + _ => {} } - _ => {} + + metadata_records_per_shard.push((*shard, metadata_for_shard)); } - Ok(metadata_record) + assert_eq!(metadata_records_per_shard.len(), shards.len()); + + Ok(metadata_records_per_shard) } fn decode_heapam_record( diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 1c281a2ea8b2..a30fdc1ee445 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -48,7 +48,7 @@ pub mod proto { tonic::include_proto!("interpreted_wal"); } -#[derive(Serialize, Deserialize)] +#[derive(Copy, Clone, Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 41294da7a04d..df2e79e0429a 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -5,7 +5,7 @@ //! Such batches are created from decoded PG wal records and ingested //! by the pageserver by writing directly to the ephemeral file. -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use bytes::{Bytes, BytesMut}; use pageserver_api::key::rel_block_to_key; @@ -136,21 +136,28 @@ impl SerializedValueBatch { /// but absent from the raw buffer [`SerializedValueBatch::raw`]). pub(crate) fn from_decoded_filtered( decoded: DecodedWALRecord, - shard: &ShardIdentity, + shards: &Vec, next_record_lsn: Lsn, pg_version: u32, - ) -> anyhow::Result { - // First determine how big the buffer needs to be and allocate it up-front. + ) -> anyhow::Result> { + // First determine how big the buffers need to be and allocate it up-front. // This duplicates some of the work below, but it's empirically much faster. - let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version); - let mut buf = Vec::::with_capacity(estimated_buffer_size); + let mut shard_batches = HashMap::with_capacity(shards.len()); + for shard in shards { + let buf = + Vec::::with_capacity(Self::estimate_buffer_size(&decoded, shard, pg_version)); + shard_batches.insert( + *shard, + SerializedValueBatch { + raw: buf, + metadata: Default::default(), + max_lsn: Lsn(0), + len: 0, + }, + ); + } - let mut metadata: Vec = Vec::with_capacity(decoded.blocks.len()); - let mut max_lsn: Lsn = Lsn(0); - let mut len: usize = 0; for blk in decoded.blocks.iter() { - let relative_off = buf.len() as u64; - let rel = RelTag { spcnode: blk.rnode_spcnode, dbnode: blk.rnode_dbnode, @@ -168,99 +175,108 @@ impl SerializedValueBatch { ); } - let key_is_local = shard.is_key_local(&key); + for shard in shards { + let mut batch = shard_batches.get_mut(shard).expect("inserted in prologue"); + let SerializedValueBatch { + raw, + metadata, + max_lsn, + len, + } = &mut batch; + + let key_is_local = shard.is_key_local(&key); + + tracing::debug!( + lsn=%next_record_lsn, + key=%key, + "ingest: shard decision {}", + if !key_is_local { "drop" } else { "keep" }, + ); - tracing::debug!( - lsn=%next_record_lsn, - key=%key, - "ingest: shard decision {}", - if !key_is_local { "drop" } else { "keep" }, - ); + if !key_is_local { + if shard.is_shard_zero() { + // Shard 0 tracks relation sizes. Although we will not store this block, we will observe + // its blkno in case it implicitly extends a relation. + metadata.push(ValueMeta::Observed(ObservedValueMeta { + key: key.to_compact(), + lsn: next_record_lsn, + })) + } - if !key_is_local { - if shard.is_shard_zero() { - // Shard 0 tracks relation sizes. Although we will not store this block, we will observe - // its blkno in case it implicitly extends a relation. - metadata.push(ValueMeta::Observed(ObservedValueMeta { - key: key.to_compact(), - lsn: next_record_lsn, - })) + continue; } - continue; - } - - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - let val = if Self::block_is_image(&decoded, blk, pg_version) { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(BLCKSZ as usize); - // TODO(vlad): skip the copy - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. // - // Match the logic of XLogReadBufferForRedoExtended: - // The page may be uninitialized. If so, we can't set the LSN because - // that would corrupt the page. - // - if !page_is_new(&image) { - page_set_lsn(&mut image, next_record_lsn) - } - assert_eq!(image.len(), BLCKSZ as usize); - - Value::Image(image.freeze()) - } else { - Value::WalRecord(NeonWalRecord::Postgres { - will_init: blk.will_init || blk.apply_image, - rec: decoded.record.clone(), - }) - }; - - val.ser_into(&mut buf) - .expect("Writing into in-memory buffer is infallible"); - - let val_ser_size = buf.len() - relative_off as usize; - - metadata.push(ValueMeta::Serialized(SerializedValueMeta { - key: key.to_compact(), - lsn: next_record_lsn, - batch_offset: relative_off, - len: val_ser_size, - will_init: val.will_init(), - })); - max_lsn = std::cmp::max(max_lsn, next_record_lsn); - len += 1; + let val = if Self::block_is_image(&decoded, blk, pg_version) { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(BLCKSZ as usize); + // TODO(vlad): skip the copy + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + // + // Match the logic of XLogReadBufferForRedoExtended: + // The page may be uninitialized. If so, we can't set the LSN because + // that would corrupt the page. + // + if !page_is_new(&image) { + page_set_lsn(&mut image, next_record_lsn) + } + assert_eq!(image.len(), BLCKSZ as usize); + + Value::Image(image.freeze()) + } else { + Value::WalRecord(NeonWalRecord::Postgres { + will_init: blk.will_init || blk.apply_image, + rec: decoded.record.clone(), + }) + }; + + let relative_off = raw.len() as u64; + + val.ser_into(raw) + .expect("Writing into in-memory buffer is infallible"); + + let val_ser_size = raw.len() - relative_off as usize; + + metadata.push(ValueMeta::Serialized(SerializedValueMeta { + key: key.to_compact(), + lsn: next_record_lsn, + batch_offset: relative_off, + len: val_ser_size, + will_init: val.will_init(), + })); + *max_lsn = std::cmp::max(*max_lsn, next_record_lsn); + *len += 1; + } } if cfg!(any(debug_assertions, test)) { - let batch = Self { - raw: buf, - metadata, - max_lsn, - len, - }; + use std::collections::HashSet; - batch.validate_lsn_order(); + // Validate that the batches are correct + for batch in shard_batches.values() { + batch.validate_lsn_order(); + } - return Ok(batch); + // Validate we produced batches for each requested shard + assert_eq!( + shard_batches.keys().collect::>(), + shards.iter().collect::>() + ); } - Ok(Self { - raw: buf, - metadata, - max_lsn, - len, - }) + Ok(shard_batches) } /// Look into the decoded PG WAL record and determine diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index c061714010a2..effe12f9448f 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -278,6 +278,8 @@ async fn import_wal( let mut walingest = WalIngest::new(tline, startpoint, ctx).await?; + let shard = vec![*tline.get_shard_identity()]; + while last_lsn <= endpoint { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE); @@ -312,12 +314,16 @@ async fn import_wal( let mut modification = tline.begin_modification(last_lsn); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let interpreted = InterpretedWalRecord::from_bytes_filtered( + let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered( recdata, - tline.get_shard_identity(), + &shard, lsn, tline.pg_version, - )?; + )? + .pop() + .unwrap(); + + assert_eq!(got_shard, *tline.get_shard_identity()); walingest .ingest_record(interpreted, &mut modification, ctx) @@ -411,6 +417,7 @@ pub async fn import_wal_from_tar( let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = start_lsn; let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?; + let shard = vec![*tline.get_shard_identity()]; // Ingest wal until end_lsn info!("importing wal until {}", end_lsn); @@ -457,12 +464,16 @@ pub async fn import_wal_from_tar( let mut modification = tline.begin_modification(last_lsn); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let interpreted = InterpretedWalRecord::from_bytes_filtered( + let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered( recdata, - tline.get_shard_identity(), + &shard, lsn, tline.pg_version, - )?; + )? + .pop() + .unwrap(); + + assert_eq!(got_shard, *tline.get_shard_identity()); walingest .ingest_record(interpreted, &mut modification, ctx) diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 3f10eeda60a9..d9aac9b630f6 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -264,6 +264,8 @@ pub(super) async fn handle_walreceiver_connection( let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?; + let shard = vec![*timeline.get_shard_identity()]; + let interpreted_proto_config = match protocol { PostgresClientProtocol::Vanilla => None, PostgresClientProtocol::Interpreted { @@ -494,12 +496,16 @@ pub(super) async fn handle_walreceiver_connection( } // Deserialize and interpret WAL record - let interpreted = InterpretedWalRecord::from_bytes_filtered( + let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered( recdata, - modification.tline.get_shard_identity(), + &shard, next_record_lsn, modification.tline.pg_version, - )?; + )? + .pop() + .unwrap(); + + assert_eq!(got_shard, *modification.tline.get_shard_identity()); if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) && uncommitted_records > 0 diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e5b23fed5155..3a8dcdfacb61 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -2161,14 +2161,18 @@ mod tests { for chunk in bytes[xlogoff..].chunks(50) { decoder.feed_bytes(chunk); while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { - let interpreted = InterpretedWalRecord::from_bytes_filtered( + let (got_shard, interpreted) = InterpretedWalRecord::from_bytes_filtered( recdata, - modification.tline.get_shard_identity(), + &vec![*modification.tline.get_shard_identity()], lsn, modification.tline.pg_version, ) + .unwrap() + .pop() .unwrap(); + assert_eq!(got_shard, *modification.tline.get_shard_identity()); + walingest .ingest_record(interpreted, &mut modification, &ctx) .instrument(span.clone()) diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 25890304221e..2f7107ac9757 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -57,6 +57,7 @@ impl InterpretedWalSender<'_, IO> { keepalive_ticker.reset(); let (tx, mut rx) = tokio::sync::mpsc::channel::(2); + let shard = vec![self.shard]; loop { tokio::select! { @@ -80,14 +81,19 @@ impl InterpretedWalSender<'_, IO> { assert!(next_record_lsn.is_aligned()); max_next_record_lsn = Some(next_record_lsn); + // Deserialize and interpret WAL record - let interpreted = InterpretedWalRecord::from_bytes_filtered( + let (shard_id, interpreted) = InterpretedWalRecord::from_bytes_filtered( recdata, - &self.shard, + &shard, next_record_lsn, self.pg_version, ) - .with_context(|| "Failed to interpret WAL")?; + .with_context(|| "Failed to interpret WAL")? + .pop() + .unwrap(); + + assert_eq!(shard_id, self.shard); if !interpreted.is_empty() { records.push(interpreted);