diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 684718d22030..1895f25bfcc1 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -19,7 +19,7 @@ impl InterpretedWalRecord { pub fn from_bytes_filtered( buf: Bytes, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { let mut decoded = DecodedWALRecord::default(); @@ -32,18 +32,18 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?; + let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?; let batch = SerializedValueBatch::from_decoded_filtered( decoded, shard, - record_end_lsn, + next_record_lsn, pg_version, )?; Ok(InterpretedWalRecord { metadata_record, batch, - end_lsn: record_end_lsn, + next_record_lsn, flush_uncommitted, xid, }) @@ -53,7 +53,7 @@ impl InterpretedWalRecord { impl MetadataRecord { fn from_decoded( decoded: &DecodedWALRecord, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result> { // Note: this doesn't actually copy the bytes since @@ -74,7 +74,9 @@ impl MetadataRecord { Ok(None) } pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), - pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XACT_ID => { + Self::decode_xact_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_MULTIXACT_ID => { Self::decode_multixact_record(&mut buf, decoded, pg_version) } @@ -86,7 +88,9 @@ impl MetadataRecord { // // Alternatively, one can make the checkpoint part of the subscription protocol // to the pageserver. This should work fine, but can be done at a later point. - pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XLOG_ID => { + Self::decode_xlog_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_LOGICALMSG_ID => { Self::decode_logical_message_record(&mut buf, decoded) } diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 88371fe51ec5..c69f8c869ac3 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -52,8 +52,10 @@ pub struct InterpretedWalRecord { /// A pre-serialized batch along with the required metadata for ingestion /// by the pageserver pub batch: SerializedValueBatch, - /// Byte offset within WAL for the end of the original PG WAL record - pub end_lsn: Lsn, + /// Byte offset within WAL for the start of the next PG WAL record. + /// Usually this is the end LSN of the current record, but in case of + /// XLOG SWITCH records it will be within the next segment. + pub next_record_lsn: Lsn, /// Whether to flush all uncommitted modifications to the storage engine /// before ingesting this record. This is currently only used for legacy PG /// database creations which read pages from a template database. Such WAL diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 632603cc8b39..9c0708ebbe7d 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -137,7 +137,7 @@ impl SerializedValueBatch { pub(crate) fn from_decoded_filtered( decoded: DecodedWALRecord, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { // First determine how big the buffer needs to be and allocate it up-front. @@ -161,13 +161,17 @@ impl SerializedValueBatch { let key = rel_block_to_key(rel, blk.blkno); if !key.is_valid_key_on_write_path() { - anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key); + anyhow::bail!( + "Unsupported key decoded at LSN {}: {}", + next_record_lsn, + key + ); } let key_is_local = shard.is_key_local(&key); tracing::debug!( - lsn=%record_end_lsn, + lsn=%next_record_lsn, key=%key, "ingest: shard decision {}", if !key_is_local { "drop" } else { "keep" }, @@ -179,7 +183,7 @@ impl SerializedValueBatch { // its blkno in case it implicitly extends a relation. metadata.push(ValueMeta::Observed(ObservedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, })) } @@ -210,7 +214,7 @@ impl SerializedValueBatch { // that would corrupt the page. // if !page_is_new(&image) { - page_set_lsn(&mut image, record_end_lsn) + page_set_lsn(&mut image, next_record_lsn) } assert_eq!(image.len(), BLCKSZ as usize); @@ -229,12 +233,12 @@ impl SerializedValueBatch { metadata.push(ValueMeta::Serialized(SerializedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, batch_offset: relative_off, len: val_ser_size, will_init: val.will_init(), })); - max_lsn = std::cmp::max(max_lsn, record_end_lsn); + max_lsn = std::cmp::max(max_lsn, next_record_lsn); len += 1; } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 34bf959058f4..6ac6920d4732 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection( Ok(()) } - while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? { + while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - if !record_end_lsn.is_aligned() { + if !next_record_lsn.is_aligned() { return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } @@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection( let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, modification.tline.get_shard_identity(), - record_end_lsn, + next_record_lsn, modification.tline.pg_version, )?; @@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection( .ingest_record(interpreted, &mut modification, &ctx) .await .with_context(|| { - format!("could not ingest record at {record_end_lsn}") + format!("could not ingest record at {next_record_lsn}") })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}"); + tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection( // to timeout the tests. fail_point!("walreceiver-after-ingest"); - last_rec_lsn = record_end_lsn; + last_rec_lsn = next_record_lsn; // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 84e553f33012..38d69760f276 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -154,7 +154,7 @@ impl WalIngest { WAL_INGEST.records_received.inc(); let prev_len = modification.len(); - modification.set_lsn(interpreted.end_lsn)?; + modification.set_lsn(interpreted.next_record_lsn)?; if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they